--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request for a transaction which has already
+ * been closed, either via a successful commit or abort (which is indicated via {@link #isSuccessful()}. This can
+ * happen if the corresponding journal record is replicated, but the message to the frontend gets lost and the backed
+ * leader moved before the frontend retried the corresponding request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClosedTransactionException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean successful;
+
+ public ClosedTransactionException(final boolean successful) {
+ super("Transaction has been " + (successful ? "committed" : "aborted"));
+ this.successful = successful;
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+
+ public boolean isSuccessful() {
+ return successful;
+ }
+}
@Override
public boolean isRetriable() {
- return true;
+ return false;
}
}
package org.opendaylight.controller.cluster.access.commands;
import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
/**
public final class DeadTransactionException extends RequestException {
private static final long serialVersionUID = 1L;
- public DeadTransactionException(final long lastSeenTransaction) {
- super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for");
+ private final RangeSet<UnsignedLong> purgedIdentifiers;
+
+ public DeadTransactionException(final RangeSet<UnsignedLong> purgedIdentifiers) {
+ super("Transactions " + purgedIdentifiers + " have been purged");
+ this.purgedIdentifiers = ImmutableRangeSet.copyOf(purgedIdentifiers);
}
@Override
public boolean isRetriable() {
- return true;
+ return false;
+ }
+
+ public RangeSet<UnsignedLong> getPurgedIdentifier() {
+ return purgedIdentifiers;
}
}
package org.opendaylight.controller.cluster.access.commands;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableRangeSet;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.ABIVersion;
@Test
public void toRequestFailureTest() throws Exception {
- final RequestException exception = new DeadTransactionException(0);
+ final RequestException exception = new DeadTransactionException(ImmutableRangeSet.of());
final ConnectClientFailure failure = OBJECT.toRequestFailure(exception);
Assert.assertNotNull(failure);
}
*/
package org.opendaylight.controller.cluster.access.commands;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
@Override
protected void isRetriable() {
- assertTrue(OBJECT.isRetriable());
+ assertFalse(OBJECT.isRetriable());
}
@Override
protected void checkMessage() {
final String message = OBJECT.getMessage();
- assertTrue("Histories up to 100 are accounted for".equals(message));
+ assertEquals("Histories up to 100 are accounted for", message);
assertNull(OBJECT.getCause());
}
*/
package org.opendaylight.controller.cluster.access.commands;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableRangeSet;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
public class DeadTransactionExceptionTest extends RequestExceptionTest<DeadTransactionException> {
- private static final RequestException OBJECT = new DeadTransactionException(100);
+ private static final RequestException OBJECT = new DeadTransactionException(ImmutableRangeSet.of());
@Override
protected void isRetriable() {
- assertTrue(OBJECT.isRetriable());
+ assertFalse(OBJECT.isRetriable());
}
@Override
protected void checkMessage() {
final String message = OBJECT.getMessage();
- assertTrue("Transaction up to 100 are accounted for".equals(message));
+ assertEquals("Transactions [] have been purged", message);
assertNull(OBJECT.getCause());
}
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
+ private final RangeSet<UnsignedLong> purgedTransactions;
private final String persistenceId;
- private final Ticker ticker;
+ private final ShardDataTree tree;
- AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
+ /**
+ * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or
+ * aborted (false). We only ever shrink these.
+ */
+ private Map<UnsignedLong, Boolean> closedTransactions;
+
+ AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree,
+ final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
- this.ticker = Preconditions.checkNotNull(ticker);
+ this.tree = Preconditions.checkNotNull(tree);
+ this.closedTransactions = Preconditions.checkNotNull(closedTransactions);
+ this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions);
}
final String persistenceId() {
}
final long readTime() {
- return ticker.read();
+ return tree.ticker().read();
}
final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
final RequestEnvelope envelope, final long now) throws RequestException {
final TransactionIdentifier id = request.getTarget();
+ final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
- FrontendTransaction tx;
if (request instanceof TransactionPurgeRequest) {
- tx = transactions.remove(id);
+ if (purgedTransactions.contains(ul)) {
+ // Retransmitted purge request: nothing to do
+ LOG.debug("{}: transaction {} already purged", persistenceId, id);
+ return new TransactionPurgeResponse(id, request.getSequence());
+ }
+
+ // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
+ // to an ImmutableMap, which does not allow remove().
+ if (closedTransactions.containsKey(ul)) {
+ tree.purgeTransaction(id, () -> {
+ closedTransactions.remove(ul);
+ if (closedTransactions.isEmpty()) {
+ closedTransactions = ImmutableMap.of();
+ }
+
+ purgedTransactions.add(Range.singleton(ul));
+ LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
+ envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+ });
+ return null;
+ }
+
+ final FrontendTransaction tx = transactions.get(id);
if (tx == null) {
- // We have no record of the transaction, nothing to do
- LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id);
+ // This should never happen because the purge callback removes the transaction and puts it into
+ // purged transactions in one go. If it does, we warn about the situation and
+ LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
+ id, purgedTransactions);
+ purgedTransactions.add(Range.singleton(ul));
return new TransactionPurgeResponse(id, request.getSequence());
}
+
+ tx.purge(() -> {
+ purgedTransactions.add(Range.singleton(ul));
+ transactions.remove(id);
+ LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
+ envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+ });
+ return null;
+ }
+
+ if (purgedTransactions.contains(ul)) {
+ LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
+ throw new DeadTransactionException(purgedTransactions);
+ }
+ final Boolean closed = closedTransactions.get(ul);
+ if (closed != null) {
+ final boolean successful = closed.booleanValue();
+ LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful"
+ : "failed");
+ throw new ClosedTransactionException(successful);
+ }
+
+ FrontendTransaction tx = transactions.get(id);
+ if (tx == null) {
+ // The transaction does not exist and we are about to create it, check sequence number
+ if (request.getSequence() != 0) {
+ LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+ throw UNSEQUENCED_START;
+ }
+
+ tx = createTransaction(request, id);
+ transactions.put(id, tx);
} else {
- tx = transactions.get(id);
- if (tx == null) {
- // The transaction does not exist and we are about to create it, check sequence number
- if (request.getSequence() != 0) {
- LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
- throw UNSEQUENCED_START;
- }
-
- tx = createTransaction(request, id);
- transactions.put(id, tx);
- } else {
- final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
- if (maybeReplay.isPresent()) {
- final TransactionSuccess<?> replay = maybeReplay.get();
- LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
- return replay;
- }
+ final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+ if (maybeReplay.isPresent()) {
+ final TransactionSuccess<?> replay = maybeReplay.get();
+ LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+ return replay;
}
}
return tx.handleRequest(request, envelope, now);
}
+ void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+ LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+ tree.closeTransactionChain(getIdentifier(), () -> {
+ envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+ });
+ }
+
+ void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+ LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+ tree.purgeTransactionChain(getIdentifier(), () -> {
+ envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+ });
+ }
+
private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
throws RequestException {
if (request instanceof CommitLocalTransactionRequest) {
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract base for transactions running on SharrdDataTree.
@NotThreadSafe
abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
implements Identifiable<TransactionIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeTransaction.class);
+
+ private final ShardDataTreeTransactionParent parent;
private final TransactionIdentifier id;
private final T snapshot;
private boolean closed;
- AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
+ AbstractShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+ final T snapshot) {
+ this.parent = Preconditions.checkNotNull(parent);
this.snapshot = Preconditions.checkNotNull(snapshot);
this.id = Preconditions.checkNotNull(id);
}
return id;
}
+ final ShardDataTreeTransactionParent getParent() {
+ return parent;
+ }
+
final T getSnapshot() {
return snapshot;
}
return true;
}
+ final void abort(final Runnable callback) {
+ Preconditions.checkState(close(), "Transaction is already closed");
+ parent.abortTransaction(this, callback);
+ }
+
+ final void purge(final Runnable callback) {
+ if (!closed) {
+ LOG.warn("Purging unclosed transaction {}", id);
+ }
+ parent.purgeTransaction(id, callback);
+ }
+
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
.toString();
}
-
- abstract void abort();
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.Collections2;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.primitives.UnsignedLong;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
import org.opendaylight.yangtools.concepts.Builder;
import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@NotThreadSafe
final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+
private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
private final RangeSet<UnsignedLong> purgedHistories;
private final ClientIdentifier identifier;
}
void onHistoryCreated(final LocalHistoryIdentifier historyId) {
- // TODO Auto-generated method stub
-
+ final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+ final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+ if (oldMeta != null) {
+ // This should not be happening, warn about it
+ LOG.warn("Reused local history {}", historyId);
+ } else {
+ LOG.debug("Created local history {}", historyId);
+ }
}
void onHistoryClosed(final LocalHistoryIdentifier historyId) {
- ensureHistory(historyId).onHistoryClosed();
+ final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+ if (builder != null) {
+ builder.onHistoryClosed();
+ LOG.debug("Closed history {}", historyId);
+ } else {
+ LOG.warn("Closed unknown history {}, ignoring", historyId);
+ }
}
void onHistoryPurged(final LocalHistoryIdentifier historyId) {
- currentHistories.remove(historyId);
+ final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+ if (history == null) {
+ LOG.warn("Purging unknown history {}", historyId);
+ }
+
// XXX: do we need to account for cookies?
purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(historyId.getHistoryId())));
+ LOG.debug("Purged history {}", historyId);
+ }
+
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionAborted(txId);
+ LOG.debug("Committed transaction {}", txId);
+ } else {
+ LOG.warn("Unknown history for aborted transaction {}, ignoring", txId);
+ }
}
void onTransactionCommitted(final TransactionIdentifier txId) {
- ensureHistory(txId.getHistoryId()).onTransactionCommitted(txId);
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionCommitted(txId);
+ LOG.debug("Aborted transaction {}", txId);
+ } else {
+ LOG.warn("Unknown history for commited transaction {}, ignoring", txId);
+ }
+ }
+
+ void onTransactionPurged(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionPurged(txId);
+ LOG.debug("Purged transaction {}", txId);
+ } else {
+ LOG.warn("Unknown history for purged transaction {}, ignoring", txId);
+ }
+ }
+
+ /**
+ * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
+ *
+ * @param shard parent shard
+ * @return Leader frontend state
+ */
+ @Nonnull LeaderFrontendState toLeaderState(@Nonnull final Shard shard) {
+ // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
+ // interactions would get intertwined leading to inconsistencies.
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
+ for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
+ if (e.getIdentifier().getHistoryId() != 0) {
+ final AbstractFrontendHistory state = e.toLeaderState(shard);
+ Verify.verify(state instanceof LocalFrontendHistory);
+ histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+ }
+ }
+
+ final AbstractFrontendHistory singleHistory;
+ final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
+ new LocalHistoryIdentifier(identifier, 0));
+ if (singleHistoryMeta == null) {
+ final ShardDataTree tree = shard.getDataStore();
+ singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
+ } else {
+ singleHistory = singleHistoryMeta.toLeaderState(shard);
+ }
+
+ return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
+ TreeRangeSet.create(purgedHistories), singleHistory, histories);
}
- private FrontendHistoryMetadataBuilder ensureHistory(final LocalHistoryIdentifier historyId) {
- return currentHistories.computeIfAbsent(historyId, FrontendHistoryMetadataBuilder::new);
+ private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
+ return currentHistories.get(txId.getHistoryId());
}
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMetadata>,
Identifiable<LocalHistoryIdentifier> {
+
+ private final Map<UnsignedLong, Boolean> closedTransactions;
+ private final RangeSet<UnsignedLong> purgedTransactions;
private final LocalHistoryIdentifier identifier;
- private long nextTransaction;
private boolean closed;
FrontendHistoryMetadataBuilder(final LocalHistoryIdentifier identifier) {
this.identifier = Preconditions.checkNotNull(identifier);
+ this.purgedTransactions = TreeRangeSet.create();
+ this.closedTransactions = new HashMap<>(2);
}
FrontendHistoryMetadataBuilder(final ClientIdentifier clientId, final FrontendHistoryMetadata meta) {
identifier = new LocalHistoryIdentifier(clientId, meta.getHistoryId(), meta.getCookie());
- nextTransaction = meta.getNextTransaction();
+ closedTransactions = new HashMap<>(meta.getClosedTransactions());
+ purgedTransactions = TreeRangeSet.create(meta.getPurgedTransactions());
closed = meta.isClosed();
}
@Override
public FrontendHistoryMetadata build() {
- return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), nextTransaction, closed);
+ return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), closed,
+ closedTransactions, purgedTransactions);
}
void onHistoryClosed() {
+ Preconditions.checkState(identifier.getHistoryId() != 0);
closed = true;
}
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.FALSE);
+ }
+
void onTransactionCommitted(final TransactionIdentifier txId) {
- nextTransaction = txId.getTransactionId() + 1;
+ closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.TRUE);
+ }
+
+ void onTransactionPurged(final TransactionIdentifier txId) {
+ final UnsignedLong id = UnsignedLong.fromLongBits(txId.getTransactionId());
+ closedTransactions.remove(id);
+ purgedTransactions.add(Range.singleton(id));
+ }
+
+ /**
+ * Transform frontend metadata for a particular client history into its {@link LocalFrontendHistory} counterpart.
+ *
+ * @param shard parent shard
+ * @return Leader history state
+ */
+ @Nonnull AbstractFrontendHistory toLeaderState(@Nonnull final Shard shard) {
+ if (identifier.getHistoryId() == 0) {
+ return StandaloneFrontendHistory.recreate(shard.persistenceId(), identifier.getClientId(),
+ shard.getDataStore(), closedTransactions, purgedTransactions);
+ }
+
+ return LocalFrontendHistory.recreate(shard.persistenceId(), shard.getDataStore(),
+ shard.getDataStore().recreateTransactionChain(identifier, closed), closedTransactions, purgedTransactions);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(FrontendMetadata.class);
private final Map<FrontendIdentifier, FrontendClientMetadataBuilder> clients = new HashMap<>();
+ private final String shardName;
+
+ FrontendMetadata(final String shardName) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ }
@Override
Class<FrontendShardDataTreeSnapshotMetadata> getSupportedType() {
final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder(id);
final FrontendClientMetadataBuilder previous = clients.put(id.getFrontendId(), client);
if (previous != null) {
- LOG.debug("Replaced client {} with {}", previous, client);
+ LOG.debug("{}: Replaced client {} with {}", shardName, previous, client);
} else {
- LOG.debug("Added client {}", client);
+ LOG.debug("{}: Added client {}", shardName, client);
}
return client;
}
ensureClient(historyId.getClientId()).onHistoryPurged(historyId);
}
+ @Override
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ ensureClient(txId.getHistoryId().getClientId()).onTransactionAborted(txId);
+ }
+
@Override
void onTransactionCommitted(final TransactionIdentifier txId) {
ensureClient(txId.getHistoryId().getClientId()).onTransactionCommitted(txId);
}
+
+ @Override
+ void onTransactionPurged(final TransactionIdentifier txId) {
+ ensureClient(txId.getHistoryId().getClientId()).onTransactionPurged(txId);
+ }
+
+ /**
+ * Transform frontend metadata into an active leader state map.
+ *
+ * @return Leader frontend state
+ */
+ @Nonnull Map<FrontendIdentifier, LeaderFrontendState> toLeaderState(@Nonnull final Shard shard) {
+ return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard)));
+ }
}
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
} else if (request instanceof ReadTransactionRequest) {
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionAbortRequest) {
- return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
- } else if (request instanceof TransactionPurgeRequest) {
- // No-op for now
- return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ return null;
} else {
throw new UnsupportedRequestException(request);
}
}
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- openTransaction.abort();
- return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+ @Override
+ void purge(final Runnable callback) {
+ openTransaction.purge(callback);
+ }
+
+ private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
+ openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(request.getTarget(),
+ request.getSequence())));
}
private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
- } else if (request instanceof TransactionPurgeRequest) {
- // No-op for now
- return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ return null;
} else {
throw new UnsupportedRequestException(request);
}
}
+ @Override
+ void purge(final Runnable callback) {
+ openTransaction.purge(callback);
+ }
+
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
});
}
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+ private void handleTransactionAbort(final TransactionAbortRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
if (readyCohort == null) {
- openTransaction.abort();
- return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+ openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+ new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+ return;
}
readyCohort.abort(new FutureCallback<Void>() {
recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
}
});
- return null;
}
private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
switch (maybeProto.get()) {
case ABORT:
- openTransaction.abort();
+ openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
openTransaction = null;
- return replyModifySuccess(request.getSequence());
+ return null;
case READY:
ensureReady();
return replyModifySuccess(request.getSequence());
abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
RequestEnvelope envelope, long now) throws RequestException;
+ // Final request, needs routing to the data tree, so it can persist a tombstone
+ abstract void purge(Runnable callback);
+
private void recordResponse(final long sequence, final Object response) {
if (replayQueue.isEmpty()) {
firstReplaySequence = sequence;
.add("lastPurgedSequence", lastPurgedSequence)
.toString();
}
+
}
private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
// Histories which have not been purged
- private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+ private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
// RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+ private final RangeSet<UnsignedLong> purgedHistories;
// Used for all standalone transactions
private final AbstractFrontendHistory standaloneHistory;
private long expectedTxSequence;
private Long lastSeenHistory = null;
-
// TODO: explicit failover notification
// Record the ActorRef for the originating actor and when we switch to being a leader send a notification
// to the frontend client -- that way it can immediately start sending requests
// - per-RequestException throw counters
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+ clientId, tree), new HashMap<>());
+ }
+
+ LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+ final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
- standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
+ this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+ this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+ this.localHistories = Preconditions.checkNotNull(localHistories);
}
@Override
lastSeenHistory = id.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
+ localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
-import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @author Robert Varga
*/
final class LocalFrontendHistory extends AbstractFrontendHistory {
- private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
-
private final ShardDataTreeTransactionChain chain;
- private final ShardDataTree tree;
-
- private Long lastSeenTransaction;
- LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
- final ShardDataTreeTransactionChain chain) {
- super(persistenceId, tree.ticker());
- this.tree = Preconditions.checkNotNull(tree);
+ private LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
+ final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+ final RangeSet<UnsignedLong> purgedTransactions) {
+ super(persistenceId, tree, closedTransactions, purgedTransactions);
this.chain = Preconditions.checkNotNull(chain);
}
+ static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
+ final LocalHistoryIdentifier historyId) {
+ return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
+ TreeRangeSet.create());
+ }
+
+ static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
+ final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+ final RangeSet<UnsignedLong> purgedTransactions) {
+ return new LocalFrontendHistory(persistenceId, tree, chain, new HashMap<>(closedTransactions),
+ TreeRangeSet.create(purgedTransactions));
+ }
+
@Override
public LocalHistoryIdentifier getIdentifier() {
return chain.getIdentifier();
@Override
FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
- checkDeadTransaction(id);
- lastSeenTransaction = id.getTransactionId();
return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id));
}
@Override
FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
- checkDeadTransaction(id);
- lastSeenTransaction = id.getTransactionId();
return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id));
}
@Override
FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
throws RequestException {
- checkDeadTransaction(id);
- lastSeenTransaction = id.getTransactionId();
return FrontendReadWriteTransaction.createReady(this, id, mod);
}
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
return chain.createReadyCohort(id, mod);
}
-
- void destroy(final long sequence, final RequestEnvelope envelope, final long now)
- throws RequestException {
- LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
- tree.closeTransactionChain(getIdentifier(), () -> {
- envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
- });
- }
-
- void purge(final long sequence, final RequestEnvelope envelope, final long now) {
- LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
- tree.purgeTransactionChain(getIdentifier(), () -> {
- envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
- });
- }
-
- private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
- // FIXME: check if this history is still open
- // FIXME: check if the last transaction has been submitted
-
- // Transaction identifiers within a local history have to have increasing IDs
- if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) {
- throw new DeadTransactionException(lastSeenTransaction);
- }
- }
}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
- ReadOnlyShardDataTreeTransaction(final TransactionIdentifier id, final DataTreeSnapshot snapshot) {
- super(id, snapshot);
- }
-
- @Override
- void abort() {
- close();
+ ReadOnlyShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+ final DataTreeSnapshot snapshot) {
+ super(parent, id, snapshot);
}
}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
- private final ShardDataTreeTransactionParent parent;
- ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
- final TransactionIdentifier id, final DataTreeModification modification) {
- super(id, modification);
- this.parent = Preconditions.checkNotNull(parent);
- }
-
- @Override
- void abort() {
- Preconditions.checkState(close(), "Transaction is already closed");
-
- parent.abortTransaction(this);
+ ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+ final DataTreeModification modification) {
+ super(parent, id, modification);
}
ShardDataTreeCohort ready() {
Preconditions.checkState(close(), "Transaction is already closed");
-
- return parent.finishTransaction(this);
+ return getParent().finishTransaction(this);
}
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata = new FrontendMetadata();
- private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+ private final FrontendMetadata frontendMetadata;
+ private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
this.name = builder.getId().toString();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ this.frontendMetadata = new FrontendMetadata(name);
setPersistence(datastoreContext.isPersistent());
persistenceId(), getId());
}
- store.closeAllTransactionChains();
+ store.purgeLeaderState();
}
if (hasLeader && !isIsolatedLeader()) {
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- boolean hasLeader = hasLeader();
- if (hasLeader && !isLeader()) {
+ final boolean hasLeader = hasLeader();
+ if (!hasLeader) {
+ // No leader implies we are not the leader, lose frontend state if we have any. This also places
+ // an explicit guard so the map will not get modified accidentally.
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+ return;
+ }
+
+ if (!isLeader()) {
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ "change and the leader address isn't available.", this);
}
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
- if (hasLeader && !isIsolatedLeader()) {
+ if (!isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
+ } else if (payload instanceof AbortTransactionPayload) {
+ allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeTransactionPayload) {
+ allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
} else if (payload instanceof CreateLocalHistoryPayload) {
allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof CloseLocalHistoryPayload) {
Verify.verify(identifier instanceof TransactionIdentifier);
payloadReplicationComplete((TransactionIdentifier) identifier);
}
+ } else if (payload instanceof AbortTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((AbortTransactionPayload) payload);
+ } else {
+ allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof PurgeTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeTransactionPayload) payload);
+ } else {
+ allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+ } else {
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ }
} else if (payload instanceof CloseLocalHistoryPayload) {
if (identifier != null) {
payloadReplicationComplete((CloseLocalHistoryPayload) payload);
finishCommit(current.cohort);
}
+ private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionAborted(txId);
+ }
+ }
+
private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
for (ShardDataTreeMetadata<?> m : metadata) {
m.onTransactionCommitted(txId);
}
}
+ private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionPurged(txId);
+ }
+ }
+
private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
for (ShardDataTreeMetadata<?> m : metadata) {
m.onHistoryCreated(historyId);
}
}
+ /**
+ * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+ * this method is used for re-establishing state when we are taking over
+ *
+ * @param historyId Local history identifier
+ * @param closed True if the chain should be created in closed state (i.e. pending purge)
+ * @return Transaction chain handle
+ */
+ ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final boolean closed) {
+ final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+ final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+ Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
+ existing);
+ return ret;
+ }
+
ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
if (txId.getHistoryId().getHistoryId() == 0) {
- return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+ return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
}
/**
- * Immediately close all transaction chains.
+ * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+ * replication callbacks.
*/
- void closeAllTransactionChains() {
+ void purgeLeaderState() {
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
}
transactionChains.clear();
+ replicationCallbacks.clear();
}
/**
}
@Override
- void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
- // Intentional no-op
+ void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+ final TransactionIdentifier id = transaction.getIdentifier();
+ LOG.debug("{}: aborting transaction {}", logContext, id);
+ replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ }
+
+
+ @Override
+ void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ LOG.debug("{}: purging transaction {}", logContext, id);
+ replicatePayload(id, PurgeTransactionPayload.create(id), callback);
}
@Override
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
+ /**
+ * Apply a recovered metadata snapshot.
+ *
+ * @param snapshot Metadata snapshot
+ */
final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata<?> snapshot) {
Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot,
getSupportedType());
doApplySnapshot(getSupportedType().cast(snapshot));
}
+ /**
+ * Reset metadata to empty state.
+ */
abstract void reset();
+ /**
+ * Apply a recovered metadata snapshot. This is not a public entrypoint, just an interface between the base class
+ * and its subclasses.
+ *
+ * @param snapshot Metadata snapshot
+ */
abstract void doApplySnapshot(@Nonnull T snapshot);
+ /**
+ * Return the type of metadata snapshot this object supports.
+ *
+ * @return Metadata type
+ */
abstract @Nonnull Class<T> getSupportedType();
+ /**
+ * Take a snapshot of current metadata state.
+ *
+ * @return Metadata snapshot, or null if the metadata is empty.
+ */
abstract @Nullable T toSnapshot();
// Lifecycle events
+
+ abstract void onTransactionAborted(TransactionIdentifier txId);
+
abstract void onTransactionCommitted(TransactionIdentifier txId);
+ abstract void onTransactionPurged(TransactionIdentifier txId);
+
abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
final DataTreeSnapshot snapshot = getSnapshot();
LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
- return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+ return new ReadOnlyShardDataTreeTransaction(this, txId, snapshot);
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
}
@Override
- protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+ void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
if (transaction instanceof ReadWriteShardDataTreeTransaction) {
Preconditions.checkState(openTransaction != null,
"Attempted to abort transaction %s while none is outstanding", transaction);
- LOG.debug("Aborted transaction {}", transaction);
+ LOG.debug("Aborted open transaction {}", transaction);
openTransaction = null;
}
+
+ dataTree.abortTransaction(transaction, callback);
+ }
+
+ @Override
+ void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ dataTree.purgeTransaction(id, callback);
}
@Override
- protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
Preconditions.checkState(openTransaction != null,
"Attempted to finish transaction %s while none is outstanding", transaction);
abstract class ShardDataTreeTransactionParent {
- abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+ abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
+
+ abstract void purgeTransaction(TransactionIdentifier id, Runnable callback);
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+
}
private final ShardStats shardStats;
private final TransactionIdentifier transactionId;
- protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionId) {
+ protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
+ final TransactionIdentifier transactionId) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.shardStats = shardStats;
this.transactionId = Preconditions.checkNotNull(transactionId);
}
- public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
- ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+ public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+ final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
}
}
@Override
- public void handleReceive(Object message) {
+ public void handleReceive(final Object message) {
if (CloseTransaction.isSerializedType(message)) {
closeTransaction(true);
} else if (message instanceof ReceiveTimeout) {
return true;
}
- private void closeTransaction(boolean sendReply) {
- getDOMStoreTransaction().abort();
+ private void closeTransaction(final boolean sendReply) {
+ getDOMStoreTransaction().abort(null);
if (sendReply && returnCloseTransactionReply()) {
getSender().tell(new CloseTransactionReply(), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+ private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
final boolean ret = transaction.isClosed();
if (ret) {
shardStats.incrementFailedReadTransactionsCount();
return ret;
}
- protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
+ protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
if (checkClosed(transaction)) {
return;
}
sender().tell(readDataReply.toSerializable(), self());
}
- protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
+ protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
if (checkClosed(transaction)) {
return;
}
final ShardStats shardStats;
final TransactionType type;
- ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
- ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+ ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+ final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
this.transaction = Preconditions.checkNotNull(transaction);
this.shardActor = shardActor;
this.shardStats = shardStats;
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
private final LocalHistoryIdentifier identifier;
private final ShardDataTree tree;
- StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
- final ShardDataTree tree) {
- super(persistenceId, ticker);
+ private StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId,
+ final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+ final RangeSet<UnsignedLong> purgedTransactions) {
+ super(persistenceId, tree, closedTransactions, purgedTransactions);
this.identifier = new LocalHistoryIdentifier(clientId, 0);
this.tree = Preconditions.checkNotNull(tree);
}
+ static StandaloneFrontendHistory create(final String persistenceId, final ClientIdentifier clientId,
+ final ShardDataTree tree) {
+ return new StandaloneFrontendHistory(persistenceId, clientId, tree, ImmutableMap.of(),
+ TreeRangeSet.create());
+ }
+
+ static StandaloneFrontendHistory recreate(final String persistenceId, final ClientIdentifier clientId,
+ final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+ final RangeSet<UnsignedLong> purgedTransactions) {
+ return new StandaloneFrontendHistory(persistenceId, clientId, tree, new HashMap<>(closedTransactions),
+ purgedTransactions);
+ }
+
@Override
public LocalHistoryIdentifier getIdentifier() {
return identifier;
*/
package org.opendaylight.controller.cluster.datastore.persisted;
+import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
import java.io.IOException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Payload persisted when a transaction is aborted. It contains the transaction identifier.
}
}
+ private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
private static final long serialVersionUID = 1L;
AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
super(transactionId, serialized);
}
- public static AbortTransactionPayload create(final TransactionIdentifier transactionId) throws IOException {
+ public static AbortTransactionPayload create(final TransactionIdentifier transactionId) {
final ByteArrayDataOutput out = ByteStreams.newDataOutput();
- transactionId.writeTo(out);
+ try {
+ transactionId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", transactionId, e);
+ throw Throwables.propagate(e);
+ }
return new AbortTransactionPayload(transactionId, out.toByteArray());
}
package org.opendaylight.controller.cluster.datastore.persisted;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.opendaylight.yangtools.concepts.WritableObject;
import org.opendaylight.yangtools.concepts.WritableObjects;
public final class FrontendHistoryMetadata implements WritableObject {
+ private final RangeSet<UnsignedLong> purgedTransactions;
+ private final Map<UnsignedLong, Boolean> closedTransactions;
private final long historyId;
private final long cookie;
- private final long nextTransaction;
private final boolean closed;
- public FrontendHistoryMetadata(final long historyId, final long cookie, final long nextTransaction,
- final boolean closed) {
+ public FrontendHistoryMetadata(final long historyId, final long cookie, final boolean closed,
+ final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
this.historyId = historyId;
this.cookie = cookie;
- this.nextTransaction = nextTransaction;
this.closed = closed;
+ this.closedTransactions = ImmutableMap.copyOf(closedTransactions);
+ this.purgedTransactions = ImmutableRangeSet.copyOf(purgedTransactions);
}
public long getHistoryId() {
return cookie;
}
- public long getNextTransaction() {
- return nextTransaction;
- }
-
public boolean isClosed() {
return closed;
}
+ public Map<UnsignedLong, Boolean> getClosedTransactions() {
+ return closedTransactions;
+ }
+
+ public RangeSet<UnsignedLong> getPurgedTransactions() {
+ return purgedTransactions;
+ }
+
@Override
public void writeTo(final DataOutput out) throws IOException {
WritableObjects.writeLongs(out, historyId, cookie);
- WritableObjects.writeLong(out, nextTransaction);
out.writeBoolean(closed);
+
+ final Set<Range<UnsignedLong>> purgedRanges = purgedTransactions.asRanges();
+ WritableObjects.writeLongs(out, closedTransactions.size(), purgedRanges.size());
+ for (Entry<UnsignedLong, Boolean> e : closedTransactions.entrySet()) {
+ WritableObjects.writeLong(out, e.getKey().longValue());
+ out.writeBoolean(e.getValue().booleanValue());
+ }
+ for (Range<UnsignedLong> r : purgedRanges) {
+ WritableObjects.writeLongs(out, r.lowerEndpoint().longValue(), r.upperEndpoint().longValue());
+ }
}
public static FrontendHistoryMetadata readFrom(final DataInput in) throws IOException {
- final byte header = WritableObjects.readLongHeader(in);
+ byte header = WritableObjects.readLongHeader(in);
final long historyId = WritableObjects.readFirstLong(in, header);
final long cookie = WritableObjects.readSecondLong(in, header);
- final long nextTransaction = WritableObjects.readLong(in);
final boolean closed = in.readBoolean();
- return new FrontendHistoryMetadata(historyId, cookie, nextTransaction, closed);
+ header = WritableObjects.readLongHeader(in);
+ long ls = WritableObjects.readFirstLong(in, header);
+ Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+ final int csize = (int) ls;
+
+ ls = WritableObjects.readSecondLong(in, header);
+ Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+ final int psize = (int) ls;
+
+ final Map<UnsignedLong, Boolean> closedTransactions = new HashMap<>(csize);
+ for (int i = 0; i < csize; ++i) {
+ final UnsignedLong key = UnsignedLong.fromLongBits(WritableObjects.readLong(in));
+ final Boolean value = Boolean.valueOf(in.readBoolean());
+ closedTransactions.put(key, value);
+ }
+ final RangeSet<UnsignedLong> purgedTransactions = TreeRangeSet.create();
+ for (int i = 0; i < psize; ++i) {
+ final byte h = WritableObjects.readLongHeader(in);
+ final UnsignedLong l = UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, h));
+ final UnsignedLong u = UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, h));
+ purgedTransactions.add(Range.closed(l, u));
+ }
+
+ return new FrontendHistoryMetadata(historyId, cookie, closed, closedTransactions, purgedTransactions);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(FrontendHistoryMetadata.class).add("historiId", historyId)
- .add("cookie", cookie).add("nextTransaction", nextTransaction).add("closed", closed).toString();
+ .add("cookie", cookie).add("closed", closed).add("closedTransactions", closedTransactions)
+ .add("purgedTransactions", purgedTransactions).toString();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a transaction is purged from the frontend. It contains the transaction identifier.
+ *
+ * @author Robert Varga
+ */
+public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<TransactionIdentifier> {
+ private static final class Proxy extends AbstractProxy<TransactionIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException {
+ return TransactionIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected PurgeTransactionPayload createObject(final TransactionIdentifier identifier,
+ final byte[] serialized) {
+ return new PurgeTransactionPayload(identifier, serialized);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
+ super(transactionId, serialized);
+ }
+
+ public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ transactionId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", transactionId, e);
+ throw Throwables.propagate(e);
+ }
+ return new PurgeTransactionPayload(transactionId, out.toByteArray());
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}
new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort();
+ subject.underlyingActor().getDOMStoreTransaction().abort(null);
future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
DataStoreVersions.CURRENT_VERSION), 3000);
new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort();
+ subject.underlyingActor().getDOMStoreTransaction().abort(null);
future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
DataStoreVersions.CURRENT_VERSION), 3000);
new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort();
+ subject.underlyingActor().getDOMStoreTransaction().abort(null);
future = akka.pattern.Patterns.ask(subject,
new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
import static org.junit.Assert.assertEquals;
-import java.io.IOException;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
public class AbortTransactionPayloadTest extends AbstractTest {
@Test
- public void testPayloadSerDes() throws IOException {
+ public void testPayloadSerDes() {
final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId());
final AbortTransactionPayload cloned = SerializationUtils.clone(template);
assertEquals(template.getIdentifier(), cloned.getIdentifier());
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
}
private static FrontendShardDataTreeSnapshotMetadata createMetadataSnapshot(final int size) {
- final List<FrontendClientMetadata> clients = new ArrayList<>();
+ final List<FrontendClientMetadata> clients = new ArrayList<>(size);
for (long i = 0; i < size; i++) {
clients.add(createFrontedClientMetadata(i));
}
final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
purgedHistories.add(Range.closed(UnsignedLong.ZERO, UnsignedLong.ONE));
- final Collection<FrontendHistoryMetadata> currentHistories = Collections
- .singleton(new FrontendHistoryMetadata(num, num, num, true));
+ final Collection<FrontendHistoryMetadata> currentHistories = Collections.singleton(
+ new FrontendHistoryMetadata(num, num, true, ImmutableMap.of(UnsignedLong.ZERO, Boolean.TRUE),
+ purgedHistories));
return new FrontendClientMetadata(clientIdentifier, purgedHistories, currentHistories);
}