From de64c6bbf2d5aeb51f4036f9dd606a9bf6f71afb Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 25 May 2016 17:44:53 +0200 Subject: [PATCH] BUG-5280: add {Create,Close,Purge}LocalHistoryPayload This patch introduces three new payloads which deal with replicating local histories to followers. These are persisted whenever a transaction chain (e.g. local history) is created or closed cleanly on the shard leader. Followers can use these to track transaction chains and pick up processing in case of a leader failover. Change-Id: I3fe5ac153c88f23f9b871bd23cb04a8e2410af91 Signed-off-by: Robert Varga --- .../FrontendClientMetadataBuilder.java | 5 + .../cluster/datastore/FrontendMetadata.java | 5 + .../datastore/LeaderFrontendState.java | 35 ++--- .../datastore/LocalFrontendHistory.java | 38 +++-- .../controller/cluster/datastore/Shard.java | 12 +- .../cluster/datastore/ShardDataTree.java | 141 +++++++++++++++--- .../datastore/ShardDataTreeMetadata.java | 3 + .../ShardDataTreeTransactionChain.java | 2 + .../persisted/CloseLocalHistoryPayload.java | 74 +++++++++ .../persisted/CreateLocalHistoryPayload.java | 74 +++++++++ .../persisted/PurgeLocalHistoryPayload.java | 75 ++++++++++ .../cluster/datastore/ShardTest.java | 9 +- 12 files changed, 405 insertions(+), 68 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java index 15e4304a46..cd1235b460 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java @@ -54,6 +54,11 @@ final class FrontendClientMetadataBuilder implements Builder { if (request instanceof CreateLocalHistoryRequest) { return handleCreateHistory((CreateLocalHistoryRequest) request); } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request, now); + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request, now); + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); } else { throw new UnsupportedRequestException(request); } @@ -133,12 +133,13 @@ final class LeaderFrontendState implements Identifiable { lastSeenHistory = id.getHistoryId(); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id))); + localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id))); LOG.debug("{}: created history {}", persistenceId, id); return new LocalHistorySuccess(id, request.getSequence()); } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now) + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.get(id); @@ -148,29 +149,23 @@ final class LeaderFrontendState implements Identifiable { return new LocalHistorySuccess(id, request.getSequence()); } - return existing.destroy(request.getSequence(), now); + existing.destroy(request.getSequence(), envelope, now); + return null; } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now) - throws RequestException { + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.remove(id); - if (existing != null) { - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); - - if (!existing.isDestroyed()) { - LOG.warn("{}: purging undestroyed history {}", persistenceId, id); - existing.destroy(request.getSequence(), now); - } - - // FIXME: record a PURGE tombstone in the journal - - LOG.debug("{}: purged history {}", persistenceId, id); - } else { + if (existing == null) { LOG.debug("{}: history {} has already been purged", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); } - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: purging history {}", persistenceId, id); + purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + existing.purge(request.getSequence(), envelope, now); + return null; } @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java index cd0cc30a09..8e32c76ba7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -8,10 +8,10 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -25,20 +25,17 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ final class LocalFrontendHistory extends AbstractFrontendHistory { - private enum State { - OPEN, - CLOSED, - } - private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class); private final ShardDataTreeTransactionChain chain; + private final ShardDataTree tree; private Long lastSeenTransaction; - private State state = State.OPEN; - LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) { - super(persistenceId, ticker); + LocalFrontendHistory(final String persistenceId, final ShardDataTree tree, + final ShardDataTreeTransactionChain chain) { + super(persistenceId, tree.ticker()); + this.tree = Preconditions.checkNotNull(tree); this.chain = Preconditions.checkNotNull(chain); } @@ -74,20 +71,19 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { return chain.createReadyCohort(id, mod); } - LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException { - if (state != State.CLOSED) { - LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); - - // FIXME: add any finalization as needed - state = State.CLOSED; - } - - // FIXME: record a DESTROY tombstone in the journal - return new LocalHistorySuccess(getIdentifier(), sequence); + void destroy(final long sequence, final RequestEnvelope envelope, final long now) + throws RequestException { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + tree.closeTransactionChain(getIdentifier(), () -> { + envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); + }); } - boolean isDestroyed() { - return state == State.CLOSED; + void purge(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); + tree.purgeTransactionChain(getIdentifier(), () -> { + envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); + }); } private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 26295fe852..cb072b5c6a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -453,13 +454,13 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload - void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) { + void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); if (canSkipPayload) { - applyState(self(), transactionId, payload); + applyState(self(), id, payload); } else { // We are faking the sender - persistData(self(), transactionId, payload, batchHint); + persistData(self(), id, payload, batchHint); } } @@ -614,7 +615,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionId(), getSender()); } - void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) { + void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -630,7 +631,8 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - store.closeTransactionChain(closeTransactionChain.getIdentifier()); + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null)); } @SuppressWarnings("checkstyle:IllegalCatch") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index d398afefa7..613f9adbc9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -39,13 +39,18 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; +import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -102,10 +107,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); private final Map transactionChains = new HashMap<>(); + private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); + + /** + * Callbacks that need to be invoked once a payload is replicated. + */ + private final Map replicationCallbacks = new HashMap<>(); + private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; @@ -150,8 +162,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), - new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); } final String logContext() { @@ -312,6 +323,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof CreateLocalHistoryPayload) { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload) { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload) { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof DataTreeCandidatePayload) { applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { @@ -367,11 +384,46 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof CreateLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CreateLocalHistoryPayload)payload); + } else { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeLocalHistoryPayload)payload); + } else { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + if (callback != null) { + replicationCallbacks.put(payload, callback); + } + shard.persistPayload(id, payload, true); + } + + private void payloadReplicationComplete(final AbstractIdentifiablePayload payload) { + final Runnable callback = replicationCallbacks.remove(payload); + if (callback != null) { + LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback); + callback.run(); + } else { + LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier()); + } + } + private void payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { @@ -394,11 +446,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { - ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryCreated(historyId); + } + } + + private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryClosed(historyId); + } + } + + private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryPurged(historyId); + } + } + + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { - chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); - transactionChains.put(localHistoryIdentifier, chain); + chain = new ShardDataTreeTransactionChain(historyId, this); + transactionChains.put(historyId, chain); + shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); } return chain; @@ -446,6 +517,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + /** + * Immediately close all transaction chains. + */ void closeAllTransactionChains() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); @@ -454,13 +528,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { transactionChains.clear(); } - void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) { - final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId); - if (chain != null) { - chain.close(); - } else { - LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId); + /** + * Close a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; } + + chain.close(); + replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + } + + /** + * Purge a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.remove(id); + if (chain == null) { + LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } Entry>>, @@ -570,7 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -600,7 +704,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void processNextPending(Queue queue, State allowedState, Consumer processor) { + private void processNextPending(final Queue queue, final State allowedState, + final Consumer processor) { while (!queue.isEmpty()) { final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; @@ -669,7 +774,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); pendingTransactions.remove(); pendingCommits.add(entry); @@ -785,14 +890,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, ticker().read())); return cohort; } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = shard.ticker().read(); + final long now = ticker().read(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; @@ -904,7 +1009,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(Iterator iter, @Nonnull TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java index c0c3d6cbb0..47d07c0892 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java @@ -32,7 +32,10 @@ abstract class ShardDataTreeMetadata> // Lifecycle events abstract void onTransactionCommitted(TransactionIdentifier txId); + abstract void onHistoryCreated(LocalHistoryIdentifier historyId); + abstract void onHistoryClosed(LocalHistoryIdentifier historyId); abstract void onHistoryPurged(LocalHistoryIdentifier historyId); + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 312e11290c..2554151dd7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; @NotThreadSafe final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class); private final LocalHistoryIdentifier chainId; private final ShardDataTree dataTree; @@ -67,6 +68,7 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent void close() { closed = true; + LOG.debug("Closing chain {}", chainId); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java new file mode 100644 index 0000000000..5dc8e5f103 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload persisted when a local history is closed cleanly. It contains a {@link LocalHistoryIdentifier}. + * + * @author Robert Varga + */ +public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } + + @Override + protected CloseLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier, + final byte[] serialized) { + return new CloseLocalHistoryPayload(identifier, serialized); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class); + private static final long serialVersionUID = 1L; + + CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) { + super(historyId, serialized); + } + + public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + historyId.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {}", historyId, e); + throw Throwables.propagate(e); + } + return new CloseLocalHistoryPayload(historyId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java new file mode 100644 index 0000000000..4b824bbb4e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload persisted when a local history is created. It contains a {@link LocalHistoryIdentifier}. + * + * @author Robert Varga + */ +public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } + + @Override + protected CreateLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier, + final byte[] serialized) { + return new CreateLocalHistoryPayload(identifier, serialized); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class); + private static final long serialVersionUID = 1L; + + CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) { + super(historyId, serialized); + } + + public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + historyId.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {}", historyId, e); + throw Throwables.propagate(e); + } + return new CreateLocalHistoryPayload(historyId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java new file mode 100644 index 0000000000..91ad74d505 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload persisted when a local history is completely purged, i.e. the frontend has removed it from its tracking. + * It contains a {@link LocalHistoryIdentifier}. + * + * @author Robert Varga + */ +public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } + + @Override + protected PurgeLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier, + final byte[] serialized) { + return new PurgeLocalHistoryPayload(identifier, serialized); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class); + private static final long serialVersionUID = 1L; + + PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) { + super(historyId, serialized); + } + + public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + historyId.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {}", historyId, e); + throw Throwables.propagate(e); + } + return new PurgeLocalHistoryPayload(historyId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 45cfd29d25..6cfde54af7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -105,6 +105,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -1445,15 +1446,15 @@ public class ShardTest extends AbstractShardTest { { final Creator creator = () -> new Shard(newShardBuilder()) { @Override - void persistPayload(final TransactionIdentifier transactionId, final Payload payload, - boolean batchHint) { + void persistPayload(final Identifier id, final Payload payload, + final boolean batchHint) { // Simulate an AbortTransaction message occurring during // replication, after // persisting and before finishing the commit to the // in-memory store. - doAbortTransaction(transactionId, null); - super.persistPayload(transactionId, payload, batchHint); + doAbortTransaction(id, null); + super.persistPayload(id, payload, batchHint); } }; -- 2.36.6