From: Robert Varga Date: Mon, 12 Dec 2016 18:34:38 +0000 (+0100) Subject: BUG-5280: add frontend state lifecycle X-Git-Tag: release/carbon~152 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2634ed7138a343f051ff6452ccc7edd3abfc0c3a BUG-5280: add frontend state lifecycle When transitioning between roles we need to take care of proper handling of state known about frontend. This patch adds the leader/non-leader transitions, creating the state from FrontendMetadata and forgetting it when transactions are committed. Our replicated log needs to grow more entries to accurately replicate the state of the conversation between the frontend and backend, so if a member becomes the leader it has an understanding of which transactions and transaction chains have been completed (aborted, committed, purged). These are replicated before a response is sent to the frontend, so if a leader before they replicate successfully, the frontend will see them as a timeout and retry them (and be routed to the new leader). Both leader and followers are expected to keep the metadata handy: the leader keeps for the purpose of being able to generate a summarized snapshot. The followers keep it so their metadata view is consistent with the contents of the data tree. Change-Id: I72eea91ee84716cdd8a6a3521b42cca9a9393aff Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java new file mode 100644 index 0000000000..ece4720564 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * A {@link RequestException} indicating that the backend has received a request for a transaction which has already + * been closed, either via a successful commit or abort (which is indicated via {@link #isSuccessful()}. This can + * happen if the corresponding journal record is replicated, but the message to the frontend gets lost and the backed + * leader moved before the frontend retried the corresponding request. + * + * @author Robert Varga + */ +@Beta +public final class ClosedTransactionException extends RequestException { + private static final long serialVersionUID = 1L; + + private final boolean successful; + + public ClosedTransactionException(final boolean successful) { + super("Transaction has been " + (successful ? "committed" : "aborted")); + this.successful = successful; + } + + @Override + public boolean isRetriable() { + return false; + } + + public boolean isSuccessful() { + return successful; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java index bf86c8b996..d4293c4741 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java @@ -26,6 +26,6 @@ public final class DeadHistoryException extends RequestException { @Override public boolean isRetriable() { - return true; + return false; } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java index 562c0e59b9..fee439984a 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java @@ -8,6 +8,9 @@ package org.opendaylight.controller.cluster.access.commands; import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.RangeSet; +import com.google.common.primitives.UnsignedLong; import org.opendaylight.controller.cluster.access.concepts.RequestException; /** @@ -20,12 +23,19 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; public final class DeadTransactionException extends RequestException { private static final long serialVersionUID = 1L; - public DeadTransactionException(final long lastSeenTransaction) { - super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for"); + private final RangeSet purgedIdentifiers; + + public DeadTransactionException(final RangeSet purgedIdentifiers) { + super("Transactions " + purgedIdentifiers + " have been purged"); + this.purgedIdentifiers = ImmutableRangeSet.copyOf(purgedIdentifiers); } @Override public boolean isRetriable() { - return true; + return false; + } + + public RangeSet getPurgedIdentifier() { + return purgedIdentifiers; } } diff --git a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java index b4da36f472..02832ae284 100644 --- a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java +++ b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.access.commands; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableRangeSet; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.access.ABIVersion; @@ -46,7 +47,7 @@ public class ConnectClientRequestTest extends AbstractRequestTest { - private static final RequestException OBJECT = new DeadTransactionException(100); + private static final RequestException OBJECT = new DeadTransactionException(ImmutableRangeSet.of()); @Override protected void isRetriable() { - assertTrue(OBJECT.isRetriable()); + assertFalse(OBJECT.isRetriable()); } @Override protected void checkMessage() { final String message = OBJECT.getMessage(); - assertTrue("Transaction up to 100 are accounted for".equals(message)); + assertEquals("Transactions [] have been purged", message); assertNull(OBJECT.getCause()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index dbea3a1cbb..f23a8ffe98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -9,13 +9,19 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; @@ -41,12 +47,22 @@ abstract class AbstractFrontendHistory implements Identifiable transactions = new HashMap<>(); + private final RangeSet purgedTransactions; private final String persistenceId; - private final Ticker ticker; + private final ShardDataTree tree; - AbstractFrontendHistory(final String persistenceId, final Ticker ticker) { + /** + * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or + * aborted (false). We only ever shrink these. + */ + private Map closedTransactions; + + AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, + final Map closedTransactions, final RangeSet purgedTransactions) { this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.ticker = Preconditions.checkNotNull(ticker); + this.tree = Preconditions.checkNotNull(tree); + this.closedTransactions = Preconditions.checkNotNull(closedTransactions); + this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions); } final String persistenceId() { @@ -54,45 +70,104 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { final TransactionIdentifier id = request.getTarget(); + final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - FrontendTransaction tx; if (request instanceof TransactionPurgeRequest) { - tx = transactions.remove(id); + if (purgedTransactions.contains(ul)) { + // Retransmitted purge request: nothing to do + LOG.debug("{}: transaction {} already purged", persistenceId, id); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it + // to an ImmutableMap, which does not allow remove(). + if (closedTransactions.containsKey(ul)) { + tree.purgeTransaction(id, () -> { + closedTransactions.remove(ul); + if (closedTransactions.isEmpty()) { + closedTransactions = ImmutableMap.of(); + } + + purgedTransactions.add(Range.singleton(ul)); + LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + final FrontendTransaction tx = transactions.get(id); if (tx == null) { - // We have no record of the transaction, nothing to do - LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id); + // This should never happen because the purge callback removes the transaction and puts it into + // purged transactions in one go. If it does, we warn about the situation and + LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, + id, purgedTransactions); + purgedTransactions.add(Range.singleton(ul)); return new TransactionPurgeResponse(id, request.getSequence()); } + + tx.purge(() -> { + purgedTransactions.add(Range.singleton(ul)); + transactions.remove(id); + LOG.debug("{}: finished purging transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + if (purgedTransactions.contains(ul)) { + LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); + throw new DeadTransactionException(purgedTransactions); + } + final Boolean closed = closedTransactions.get(ul); + if (closed != null) { + final boolean successful = closed.booleanValue(); + LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful" + : "failed"); + throw new ClosedTransactionException(successful); + } + + FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // The transaction does not exist and we are about to create it, check sequence number + if (request.getSequence() != 0) { + LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); + throw UNSEQUENCED_START; + } + + tx = createTransaction(request, id); + transactions.put(id, tx); } else { - tx = transactions.get(id); - if (tx == null) { - // The transaction does not exist and we are about to create it, check sequence number - if (request.getSequence() != 0) { - LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); - throw UNSEQUENCED_START; - } - - tx = createTransaction(request, id); - transactions.put(id, tx); - } else { - final Optional> maybeReplay = tx.replaySequence(request.getSequence()); - if (maybeReplay.isPresent()) { - final TransactionSuccess replay = maybeReplay.get(); - LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); - return replay; - } + final Optional> maybeReplay = tx.replaySequence(request.getSequence()); + if (maybeReplay.isPresent()) { + final TransactionSuccess replay = maybeReplay.get(); + LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); + return replay; } } return tx.handleRequest(request, envelope, now); } + void destroy(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + tree.closeTransactionChain(getIdentifier(), () -> { + envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); + }); + } + + void purge(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); + tree.purgeTransactionChain(getIdentifier(), () -> { + envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); + }); + } + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) throws RequestException { if (request instanceof CommitLocalTransactionRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java index 222280cf3b..56e11c1249 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java @@ -13,6 +13,8 @@ import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract base for transactions running on SharrdDataTree. @@ -22,12 +24,17 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @NotThreadSafe abstract class AbstractShardDataTreeTransaction implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeTransaction.class); + + private final ShardDataTreeTransactionParent parent; private final TransactionIdentifier id; private final T snapshot; private boolean closed; - AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) { + AbstractShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id, + final T snapshot) { + this.parent = Preconditions.checkNotNull(parent); this.snapshot = Preconditions.checkNotNull(snapshot); this.id = Preconditions.checkNotNull(id); } @@ -37,6 +44,10 @@ abstract class AbstractShardDataTreeTransaction return id; } + final ShardDataTreeTransactionParent getParent() { + return parent; + } + final T getSnapshot() { return snapshot; } @@ -59,11 +70,21 @@ abstract class AbstractShardDataTreeTransaction return true; } + final void abort(final Runnable callback) { + Preconditions.checkState(close(), "Transaction is already closed"); + parent.abortTransaction(this, callback); + } + + final void purge(final Runnable callback) { + if (!closed) { + LOG.warn("Purging unclosed transaction {}", id); + } + parent.purgeTransaction(id, callback); + } + @Override public final String toString() { return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot) .toString(); } - - abstract void abort(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java index cd1235b460..8a0ce605df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Collections2; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; @@ -15,6 +16,8 @@ import com.google.common.collect.TreeRangeSet; import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -22,8 +25,13 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMet import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; import org.opendaylight.yangtools.concepts.Builder; import org.opendaylight.yangtools.concepts.Identifiable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@NotThreadSafe final class FrontendClientMetadataBuilder implements Builder, Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class); + private final Map currentHistories = new HashMap<>(); private final RangeSet purgedHistories; private final ClientIdentifier identifier; @@ -55,25 +63,100 @@ final class FrontendClientMetadataBuilder implements Builder histories = new HashMap<>(); + for (FrontendHistoryMetadataBuilder e : currentHistories.values()) { + if (e.getIdentifier().getHistoryId() != 0) { + final AbstractFrontendHistory state = e.toLeaderState(shard); + Verify.verify(state instanceof LocalFrontendHistory); + histories.put(e.getIdentifier(), (LocalFrontendHistory) state); + } + } + + final AbstractFrontendHistory singleHistory; + final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get( + new LocalHistoryIdentifier(identifier, 0)); + if (singleHistoryMeta == null) { + final ShardDataTree tree = shard.getDataStore(); + singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree); + } else { + singleHistory = singleHistoryMeta.toLeaderState(shard); + } + + return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(), + TreeRangeSet.create(purgedHistories), singleHistory, histories); } - private FrontendHistoryMetadataBuilder ensureHistory(final LocalHistoryIdentifier historyId) { - return currentHistories.computeIfAbsent(historyId, FrontendHistoryMetadataBuilder::new); + private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) { + return currentHistories.get(txId.getHistoryId()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java index 6b901b4fa8..beed765a69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java @@ -8,6 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -17,18 +24,23 @@ import org.opendaylight.yangtools.concepts.Identifiable; final class FrontendHistoryMetadataBuilder implements Builder, Identifiable { + + private final Map closedTransactions; + private final RangeSet purgedTransactions; private final LocalHistoryIdentifier identifier; - private long nextTransaction; private boolean closed; FrontendHistoryMetadataBuilder(final LocalHistoryIdentifier identifier) { this.identifier = Preconditions.checkNotNull(identifier); + this.purgedTransactions = TreeRangeSet.create(); + this.closedTransactions = new HashMap<>(2); } FrontendHistoryMetadataBuilder(final ClientIdentifier clientId, final FrontendHistoryMetadata meta) { identifier = new LocalHistoryIdentifier(clientId, meta.getHistoryId(), meta.getCookie()); - nextTransaction = meta.getNextTransaction(); + closedTransactions = new HashMap<>(meta.getClosedTransactions()); + purgedTransactions = TreeRangeSet.create(meta.getPurgedTransactions()); closed = meta.isClosed(); } @@ -39,14 +51,42 @@ final class FrontendHistoryMetadataBuilder implements Builder clients = new HashMap<>(); + private final String shardName; + + FrontendMetadata(final String shardName) { + this.shardName = Preconditions.checkNotNull(shardName); + } @Override Class getSupportedType() { @@ -67,9 +75,9 @@ final class FrontendMetadata extends ShardDataTreeMetadata toLeaderState(@Nonnull final Shard shard) { + return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard))); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java index 465cfcbee1..5c0d913963 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java @@ -17,8 +17,6 @@ import org.opendaylight.controller.cluster.access.commands.ReadTransactionReques import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; @@ -55,19 +53,22 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction { } else if (request instanceof ReadTransactionRequest) { return handleReadTransaction((ReadTransactionRequest) request); } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); - } else if (request instanceof TransactionPurgeRequest) { - // No-op for now - return new TransactionPurgeResponse(request.getTarget(), request.getSequence()); + handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + return null; } else { throw new UnsupportedRequestException(request); } } - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - openTransaction.abort(); - return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence()); + @Override + void purge(final Runnable callback) { + openTransaction.purge(callback); + } + + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(request.getTarget(), + request.getSequence()))); } private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 2c2a287670..9c4d4ddbca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -32,8 +32,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; @@ -103,15 +101,18 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); - } else if (request instanceof TransactionPurgeRequest) { - // No-op for now - return new TransactionPurgeResponse(request.getTarget(), request.getSequence()); + handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + return null; } else { throw new UnsupportedRequestException(request); } } + @Override + void purge(final Runnable callback) { + openTransaction.purge(callback); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { readyCohort.preCommit(new FutureCallback() { @@ -145,11 +146,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(getIdentifier(), request.getSequence()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, + new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); + return; } readyCohort.abort(new FutureCallback() { @@ -168,7 +170,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); } }); - return null; } private void coordinatedCommit(final RequestEnvelope envelope, final long now) { @@ -298,9 +299,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { switch (maybeProto.get()) { case ABORT: - openTransaction.abort(); + openTransaction.abort(() -> replyModifySuccess(request.getSequence())); openTransaction = null; - return replyModifySuccess(request.getSequence()); + return null; case READY: ensureReady(); return replyModifySuccess(request.getSequence()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 1ef775e041..fccf3bf4a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -112,6 +112,9 @@ abstract class FrontendTransaction implements Identifiable handleRequest(TransactionRequest request, RequestEnvelope envelope, long now) throws RequestException; + // Final request, needs routing to the data tree, so it can persist a tombstone + abstract void purge(Runnable callback); + private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { firstReplaySequence = sequence; @@ -148,4 +151,5 @@ abstract class FrontendTransaction implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); // Histories which have not been purged - private final Map localHistories = new HashMap<>(); + private final Map localHistories; // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories = TreeRangeSet.create(); + private final RangeSet purgedHistories; // Used for all standalone transactions private final AbstractFrontendHistory standaloneHistory; @@ -61,7 +61,6 @@ final class LeaderFrontendState implements Identifiable { private long expectedTxSequence; private Long lastSeenHistory = null; - // TODO: explicit failover notification // Record the ActorRef for the originating actor and when we switch to being a leader send a notification // to the frontend client -- that way it can immediately start sending requests @@ -72,10 +71,19 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId, + clientId, tree), new HashMap<>()); + } + + LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final RangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); this.tree = Preconditions.checkNotNull(tree); - standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree); + this.purgedHistories = Preconditions.checkNotNull(purgedHistories); + this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory); + this.localHistories = Preconditions.checkNotNull(localHistories); } @Override @@ -133,7 +141,7 @@ final class LeaderFrontendState implements Identifiable { lastSeenHistory = id.getHistoryId(); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id))); + localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id)); LOG.debug("{}: created history {}", persistenceId, id); return new LocalHistorySuccess(id, request.getSequence()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java index 8e32c76ba7..94c0965c00 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -8,15 +8,16 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; -import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.HashMap; +import java.util.Map; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -25,20 +26,28 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ final class LocalFrontendHistory extends AbstractFrontendHistory { - private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class); - private final ShardDataTreeTransactionChain chain; - private final ShardDataTree tree; - - private Long lastSeenTransaction; - LocalFrontendHistory(final String persistenceId, final ShardDataTree tree, - final ShardDataTreeTransactionChain chain) { - super(persistenceId, tree.ticker()); - this.tree = Preconditions.checkNotNull(tree); + private LocalFrontendHistory(final String persistenceId, final ShardDataTree tree, + final ShardDataTreeTransactionChain chain, final Map closedTransactions, + final RangeSet purgedTransactions) { + super(persistenceId, tree, closedTransactions, purgedTransactions); this.chain = Preconditions.checkNotNull(chain); } + static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree, + final LocalHistoryIdentifier historyId) { + return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(), + TreeRangeSet.create()); + } + + static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree, + final ShardDataTreeTransactionChain chain, final Map closedTransactions, + final RangeSet purgedTransactions) { + return new LocalFrontendHistory(persistenceId, tree, chain, new HashMap<>(closedTransactions), + TreeRangeSet.create(purgedTransactions)); + } + @Override public LocalHistoryIdentifier getIdentifier() { return chain.getIdentifier(); @@ -46,23 +55,17 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { @Override FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException { - checkDeadTransaction(id); - lastSeenTransaction = id.getTransactionId(); return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id)); } @Override FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException { - checkDeadTransaction(id); - lastSeenTransaction = id.getTransactionId(); return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id)); } @Override FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod) throws RequestException { - checkDeadTransaction(id); - lastSeenTransaction = id.getTransactionId(); return FrontendReadWriteTransaction.createReady(this, id, mod); } @@ -70,29 +73,4 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) { return chain.createReadyCohort(id, mod); } - - void destroy(final long sequence, final RequestEnvelope envelope, final long now) - throws RequestException { - LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); - tree.closeTransactionChain(getIdentifier(), () -> { - envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); - }); - } - - void purge(final long sequence, final RequestEnvelope envelope, final long now) { - LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); - tree.purgeTransactionChain(getIdentifier(), () -> { - envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); - }); - } - - private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException { - // FIXME: check if this history is still open - // FIXME: check if the last transaction has been submitted - - // Transaction identifiers within a local history have to have increasing IDs - if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) { - throw new DeadTransactionException(lastSeenTransaction); - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java index 040a652a0f..4df1352b19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java @@ -11,12 +11,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction { - ReadOnlyShardDataTreeTransaction(final TransactionIdentifier id, final DataTreeSnapshot snapshot) { - super(id, snapshot); - } - - @Override - void abort() { - close(); + ReadOnlyShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id, + final DataTreeSnapshot snapshot) { + super(parent, id, snapshot); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index 492f6ec9f6..fe5dba8a38 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -12,24 +12,14 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { - private final ShardDataTreeTransactionParent parent; - ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, - final TransactionIdentifier id, final DataTreeModification modification) { - super(id, modification); - this.parent = Preconditions.checkNotNull(parent); - } - - @Override - void abort() { - Preconditions.checkState(close(), "Transaction is already closed"); - - parent.abortTransaction(this); + ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id, + final DataTreeModification modification) { + super(parent, id, modification); } ShardDataTreeCohort ready() { Preconditions.checkState(close(), "Transaction is already closed"); - - return parent.finishTransaction(this); + return getParent().finishTransaction(this); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7ca79fc234..7915a6286e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -18,13 +18,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -164,8 +165,8 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - private final FrontendMetadata frontendMetadata = new FrontendMetadata(); - private final Map knownFrontends = new HashMap<>(); + private final FrontendMetadata frontendMetadata; + private Map knownFrontends = ImmutableMap.of(); protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -174,6 +175,7 @@ public class Shard extends RaftActor { this.name = builder.getId().toString(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + this.frontendMetadata = new FrontendMetadata(name); setPersistence(datastoreContext.isPersistent()); @@ -733,7 +735,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - store.closeAllTransactionChains(); + store.purgeLeaderState(); } if (hasLeader && !isIsolatedLeader()) { @@ -745,8 +747,18 @@ public class Shard extends RaftActor { protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); - boolean hasLeader = hasLeader(); - if (hasLeader && !isLeader()) { + final boolean hasLeader = hasLeader(); + if (!hasLeader) { + // No leader implies we are not the leader, lose frontend state if we have any. This also places + // an explicit guard so the map will not get modified accidentally. + if (!knownFrontends.isEmpty()) { + LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); + knownFrontends = ImmutableMap.of(); + } + return; + } + + if (!isLeader()) { // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -765,9 +777,13 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership " + "change and the leader address isn't available.", this); } + } else { + // We have become the leader, we need to reconstruct frontend state + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } - if (hasLeader && !isIsolatedLeader()) { + if (!isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 613f9adbc9..e8469d439d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -45,12 +45,14 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -323,6 +325,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof AbortTransactionPayload) { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload) { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { @@ -384,6 +390,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } + } else if (payload instanceof AbortTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((AbortTransactionPayload) payload); + } else { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeTransactionPayload) payload); + } else { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); @@ -440,12 +464,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { finishCommit(current.cohort); } + private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionAborted(txId); + } + } + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { for (ShardDataTreeMetadata m : metadata) { m.onTransactionCommitted(txId); } } + private void allMetadataPurgedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionPurged(txId); + } + } + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { for (ShardDataTreeMetadata m : metadata) { m.onHistoryCreated(historyId); @@ -464,6 +500,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + /** + * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, + * this method is used for re-establishing state when we are taking over + * + * @param historyId Local history identifier + * @param closed True if the chain should be created in closed state (i.e. pending purge) + * @return Transaction chain handle + */ + ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final boolean closed) { + final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); + final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); + Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, + existing); + return ret; + } + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { @@ -477,7 +530,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot()); + return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); @@ -518,14 +571,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } /** - * Immediately close all transaction chains. + * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled + * replication callbacks. */ - void closeAllTransactionChains() { + void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } transactionChains.clear(); + replicationCallbacks.clear(); } /** @@ -597,8 +652,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction) { - // Intentional no-op + void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final TransactionIdentifier id = transaction.getIdentifier(); + LOG.debug("{}: aborting transaction {}", logContext, id); + replicatePayload(id, AbortTransactionPayload.create(id), callback); + } + + + @Override + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + LOG.debug("{}: purging transaction {}", logContext, id); + replicatePayload(id, PurgeTransactionPayload.create(id), callback); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java index 47d07c0892..7db3a228c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java @@ -15,23 +15,52 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; abstract class ShardDataTreeMetadata> { + /** + * Apply a recovered metadata snapshot. + * + * @param snapshot Metadata snapshot + */ final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata snapshot) { Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot, getSupportedType()); doApplySnapshot(getSupportedType().cast(snapshot)); } + /** + * Reset metadata to empty state. + */ abstract void reset(); + /** + * Apply a recovered metadata snapshot. This is not a public entrypoint, just an interface between the base class + * and its subclasses. + * + * @param snapshot Metadata snapshot + */ abstract void doApplySnapshot(@Nonnull T snapshot); + /** + * Return the type of metadata snapshot this object supports. + * + * @return Metadata type + */ abstract @Nonnull Class getSupportedType(); + /** + * Take a snapshot of current metadata state. + * + * @return Metadata snapshot, or null if the metadata is empty. + */ abstract @Nullable T toSnapshot(); // Lifecycle events + + abstract void onTransactionAborted(TransactionIdentifier txId); + abstract void onTransactionCommitted(TransactionIdentifier txId); + abstract void onTransactionPurged(TransactionIdentifier txId); + abstract void onHistoryCreated(LocalHistoryIdentifier historyId); abstract void onHistoryClosed(LocalHistoryIdentifier historyId); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 2554151dd7..dfd6680045 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -55,7 +55,7 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent final DataTreeSnapshot snapshot = getSnapshot(); LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot); - return new ReadOnlyShardDataTreeTransaction(txId, snapshot); + return new ReadOnlyShardDataTreeTransaction(this, txId, snapshot); } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { @@ -72,17 +72,24 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - protected void abortTransaction(final AbstractShardDataTreeTransaction transaction) { + void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { if (transaction instanceof ReadWriteShardDataTreeTransaction) { Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction); - LOG.debug("Aborted transaction {}", transaction); + LOG.debug("Aborted open transaction {}", transaction); openTransaction = null; } + + dataTree.abortTransaction(transaction, callback); + } + + @Override + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + dataTree.purgeTransaction(id, callback); } @Override - protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 20f50156f8..2b0b74d02c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -12,9 +12,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification abstract class ShardDataTreeTransactionParent { - abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); + abstract void abortTransaction(AbstractShardDataTreeTransaction transaction, Runnable callback); + + abstract void purgeTransaction(TransactionIdentifier id, Runnable callback); abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod); + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index e3fbb82ee8..b4fee8e9c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -37,15 +37,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering private final ShardStats shardStats; private final TransactionIdentifier transactionId; - protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionId) { + protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats, + final TransactionIdentifier transactionId) { super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; this.shardStats = shardStats; this.transactionId = Preconditions.checkNotNull(transactionId); } - public static Props props(TransactionType type, AbstractShardDataTreeTransaction transaction, - ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) { + public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction transaction, + final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) { return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats)); } @@ -60,7 +61,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering } @Override - public void handleReceive(Object message) { + public void handleReceive(final Object message) { if (CloseTransaction.isSerializedType(message)) { closeTransaction(true); } else if (message instanceof ReceiveTimeout) { @@ -75,8 +76,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return true; } - private void closeTransaction(boolean sendReply) { - getDOMStoreTransaction().abort(); + private void closeTransaction(final boolean sendReply) { + getDOMStoreTransaction().abort(null); if (sendReply && returnCloseTransactionReply()) { getSender().tell(new CloseTransactionReply(), getSelf()); @@ -85,7 +86,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering getSelf().tell(PoisonPill.getInstance(), getSelf()); } - private boolean checkClosed(AbstractShardDataTreeTransaction transaction) { + private boolean checkClosed(final AbstractShardDataTreeTransaction transaction) { final boolean ret = transaction.isClosed(); if (ret) { shardStats.incrementFailedReadTransactionsCount(); @@ -95,7 +96,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return ret; } - protected void readData(AbstractShardDataTreeTransaction transaction, ReadData message) { + protected void readData(final AbstractShardDataTreeTransaction transaction, final ReadData message) { if (checkClosed(transaction)) { return; } @@ -106,7 +107,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering sender().tell(readDataReply.toSerializable(), self()); } - protected void dataExists(AbstractShardDataTreeTransaction transaction, DataExists message) { + protected void dataExists(final AbstractShardDataTreeTransaction transaction, final DataExists message) { if (checkClosed(transaction)) { return; } @@ -128,8 +129,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering final ShardStats shardStats; final TransactionType type; - ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction transaction, - ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) { + ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction transaction, + final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) { this.transaction = Preconditions.checkNotNull(transaction); this.shardActor = shardActor; this.shardStats = shardStats; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java index fe2588d577..f08ff4a445 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java @@ -8,7 +8,12 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.HashMap; +import java.util.Map; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -25,13 +30,27 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory { private final LocalHistoryIdentifier identifier; private final ShardDataTree tree; - StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId, - final ShardDataTree tree) { - super(persistenceId, ticker); + private StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, + final ShardDataTree tree, final Map closedTransactions, + final RangeSet purgedTransactions) { + super(persistenceId, tree, closedTransactions, purgedTransactions); this.identifier = new LocalHistoryIdentifier(clientId, 0); this.tree = Preconditions.checkNotNull(tree); } + static StandaloneFrontendHistory create(final String persistenceId, final ClientIdentifier clientId, + final ShardDataTree tree) { + return new StandaloneFrontendHistory(persistenceId, clientId, tree, ImmutableMap.of(), + TreeRangeSet.create()); + } + + static StandaloneFrontendHistory recreate(final String persistenceId, final ClientIdentifier clientId, + final ShardDataTree tree, final Map closedTransactions, + final RangeSet purgedTransactions) { + return new StandaloneFrontendHistory(persistenceId, clientId, tree, new HashMap<>(closedTransactions), + purgedTransactions); + } + @Override public LocalHistoryIdentifier getIdentifier() { return identifier; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java index b553bf92a2..751f5b21ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java @@ -7,11 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; +import com.google.common.base.Throwables; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import java.io.DataInput; import java.io.IOException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Payload persisted when a transaction is aborted. It contains the transaction identifier. @@ -45,15 +48,22 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload purgedTransactions; + private final Map closedTransactions; private final long historyId; private final long cookie; - private final long nextTransaction; private final boolean closed; - public FrontendHistoryMetadata(final long historyId, final long cookie, final long nextTransaction, - final boolean closed) { + public FrontendHistoryMetadata(final long historyId, final long cookie, final boolean closed, + final Map closedTransactions, final RangeSet purgedTransactions) { this.historyId = historyId; this.cookie = cookie; - this.nextTransaction = nextTransaction; this.closed = closed; + this.closedTransactions = ImmutableMap.copyOf(closedTransactions); + this.purgedTransactions = ImmutableRangeSet.copyOf(purgedTransactions); } public long getHistoryId() { @@ -36,34 +49,70 @@ public final class FrontendHistoryMetadata implements WritableObject { return cookie; } - public long getNextTransaction() { - return nextTransaction; - } - public boolean isClosed() { return closed; } + public Map getClosedTransactions() { + return closedTransactions; + } + + public RangeSet getPurgedTransactions() { + return purgedTransactions; + } + @Override public void writeTo(final DataOutput out) throws IOException { WritableObjects.writeLongs(out, historyId, cookie); - WritableObjects.writeLong(out, nextTransaction); out.writeBoolean(closed); + + final Set> purgedRanges = purgedTransactions.asRanges(); + WritableObjects.writeLongs(out, closedTransactions.size(), purgedRanges.size()); + for (Entry e : closedTransactions.entrySet()) { + WritableObjects.writeLong(out, e.getKey().longValue()); + out.writeBoolean(e.getValue().booleanValue()); + } + for (Range r : purgedRanges) { + WritableObjects.writeLongs(out, r.lowerEndpoint().longValue(), r.upperEndpoint().longValue()); + } } public static FrontendHistoryMetadata readFrom(final DataInput in) throws IOException { - final byte header = WritableObjects.readLongHeader(in); + byte header = WritableObjects.readLongHeader(in); final long historyId = WritableObjects.readFirstLong(in, header); final long cookie = WritableObjects.readSecondLong(in, header); - final long nextTransaction = WritableObjects.readLong(in); final boolean closed = in.readBoolean(); - return new FrontendHistoryMetadata(historyId, cookie, nextTransaction, closed); + header = WritableObjects.readLongHeader(in); + long ls = WritableObjects.readFirstLong(in, header); + Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE); + final int csize = (int) ls; + + ls = WritableObjects.readSecondLong(in, header); + Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE); + final int psize = (int) ls; + + final Map closedTransactions = new HashMap<>(csize); + for (int i = 0; i < csize; ++i) { + final UnsignedLong key = UnsignedLong.fromLongBits(WritableObjects.readLong(in)); + final Boolean value = Boolean.valueOf(in.readBoolean()); + closedTransactions.put(key, value); + } + final RangeSet purgedTransactions = TreeRangeSet.create(); + for (int i = 0; i < psize; ++i) { + final byte h = WritableObjects.readLongHeader(in); + final UnsignedLong l = UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, h)); + final UnsignedLong u = UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, h)); + purgedTransactions.add(Range.closed(l, u)); + } + + return new FrontendHistoryMetadata(historyId, cookie, closed, closedTransactions, purgedTransactions); } @Override public String toString() { return MoreObjects.toStringHelper(FrontendHistoryMetadata.class).add("historiId", historyId) - .add("cookie", cookie).add("nextTransaction", nextTransaction).add("closed", closed).toString(); + .add("cookie", cookie).add("closed", closed).add("closedTransactions", closedTransactions) + .add("purgedTransactions", purgedTransactions).toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java new file mode 100644 index 0000000000..71e794b4d4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload persisted when a transaction is purged from the frontend. It contains the transaction identifier. + * + * @author Robert Varga + */ +public final class PurgeTransactionPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException { + return TransactionIdentifier.readFrom(in); + } + + @Override + protected PurgeTransactionPayload createObject(final TransactionIdentifier identifier, + final byte[] serialized) { + return new PurgeTransactionPayload(identifier, serialized); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class); + private static final long serialVersionUID = 1L; + + PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) { + super(transactionId, serialized); + } + + public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + transactionId.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {}", transactionId, e); + throw Throwables.propagate(e); + } + return new PurgeTransactionPayload(transactionId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 6ccede2776..00087e37df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -70,7 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().abort(); + subject.underlyingActor().getDOMStoreTransaction().abort(null); future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); @@ -92,7 +92,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().abort(); + subject.underlyingActor().getDOMStoreTransaction().abort(null); future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); @@ -113,7 +113,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().abort(); + subject.underlyingActor().getDOMStoreTransaction().abort(null); future = akka.pattern.Patterns.ask(subject, new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java index aa2a9dbe10..43b2c9eaa9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java @@ -9,14 +9,13 @@ package org.opendaylight.controller.cluster.datastore.persisted; import static org.junit.Assert.assertEquals; -import java.io.IOException; import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractTest; public class AbortTransactionPayloadTest extends AbstractTest { @Test - public void testPayloadSerDes() throws IOException { + public void testPayloadSerDes() { final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId()); final AbortTransactionPayload cloned = SerializationUtils.clone(template); assertEquals(template.getIdentifier(), cloned.getIdentifier()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java index e175a09eb4..f8734850a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java @@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; @@ -90,7 +91,7 @@ public class FrontendShardDataTreeSnapshotMetadataTest { } private static FrontendShardDataTreeSnapshotMetadata createMetadataSnapshot(final int size) { - final List clients = new ArrayList<>(); + final List clients = new ArrayList<>(size); for (long i = 0; i < size; i++) { clients.add(createFrontedClientMetadata(i)); } @@ -107,8 +108,9 @@ public class FrontendShardDataTreeSnapshotMetadataTest { final RangeSet purgedHistories = TreeRangeSet.create(); purgedHistories.add(Range.closed(UnsignedLong.ZERO, UnsignedLong.ONE)); - final Collection currentHistories = Collections - .singleton(new FrontendHistoryMetadata(num, num, num, true)); + final Collection currentHistories = Collections.singleton( + new FrontendHistoryMetadata(num, num, true, ImmutableMap.of(UnsignedLong.ZERO, Boolean.TRUE), + purgedHistories)); return new FrontendClientMetadata(clientIdentifier, purgedHistories, currentHistories); }