return identifier;
}
+ void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+ // TODO Auto-generated method stub
+
+ }
+
void onHistoryClosed(final LocalHistoryIdentifier historyId) {
ensureHistory(historyId).onHistoryClosed();
}
return client;
}
+ @Override
+ void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+ ensureClient(historyId.getClientId()).onHistoryCreated(historyId);
+ }
+
@Override
void onHistoryClosed(final LocalHistoryIdentifier historyId) {
ensureClient(historyId.getClientId()).onHistoryClosed(historyId);
if (request instanceof CreateLocalHistoryRequest) {
return handleCreateHistory((CreateLocalHistoryRequest) request);
} else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
} else {
throw new UnsupportedRequestException(request);
}
lastSeenHistory = id.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
+ localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}
- private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now)
throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.get(id);
return new LocalHistorySuccess(id, request.getSequence());
}
- return existing.destroy(request.getSequence(), now);
+ existing.destroy(request.getSequence(), envelope, now);
+ return null;
}
- private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
- throws RequestException {
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.remove(id);
- if (existing != null) {
- purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
-
- if (!existing.isDestroyed()) {
- LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
- existing.destroy(request.getSequence(), now);
- }
-
- // FIXME: record a PURGE tombstone in the journal
-
- LOG.debug("{}: purged history {}", persistenceId, id);
- } else {
+ if (existing == null) {
LOG.debug("{}: history {} has already been purged", persistenceId, id);
+ return new LocalHistorySuccess(id, request.getSequence());
}
- return new LocalHistorySuccess(id, request.getSequence());
+ LOG.debug("{}: purging history {}", persistenceId, id);
+ purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+ existing.purge(request.getSequence(), envelope, now);
+ return null;
}
@Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
* @author Robert Varga
*/
final class LocalFrontendHistory extends AbstractFrontendHistory {
- private enum State {
- OPEN,
- CLOSED,
- }
-
private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
private final ShardDataTreeTransactionChain chain;
+ private final ShardDataTree tree;
private Long lastSeenTransaction;
- private State state = State.OPEN;
- LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
- super(persistenceId, ticker);
+ LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
+ final ShardDataTreeTransactionChain chain) {
+ super(persistenceId, tree.ticker());
+ this.tree = Preconditions.checkNotNull(tree);
this.chain = Preconditions.checkNotNull(chain);
}
return chain.createReadyCohort(id, mod);
}
- LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
- if (state != State.CLOSED) {
- LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
-
- // FIXME: add any finalization as needed
- state = State.CLOSED;
- }
-
- // FIXME: record a DESTROY tombstone in the journal
- return new LocalHistorySuccess(getIdentifier(), sequence);
+ void destroy(final long sequence, final RequestEnvelope envelope, final long now)
+ throws RequestException {
+ LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+ tree.closeTransactionChain(getIdentifier(), () -> {
+ envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+ });
}
- boolean isDestroyed() {
- return state == State.CLOSED;
+ void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+ LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+ tree.purgeTransactionChain(getIdentifier(), () -> {
+ envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+ });
}
private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
}
// applyState() will be invoked once consensus is reached on the payload
- void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
+ void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
if (canSkipPayload) {
- applyState(self(), transactionId, payload);
+ applyState(self(), id, payload);
} else {
// We are faking the sender
- persistData(self(), transactionId, payload, batchHint);
+ persistData(self(), id, payload, batchHint);
}
}
doAbortTransaction(abort.getTransactionId(), getSender());
}
- void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+ void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getIdentifier());
+ final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+ store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
}
@SuppressWarnings("checkstyle:IllegalCatch")
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+
private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
+
+ /**
+ * Callbacks that need to be invoked once a payload is replicated.
+ */
+ private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
+
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final Collection<ShardDataTreeMetadata<?>> metadata;
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
- new DefaultShardDataTreeChangeListenerPublisher(),
- new DefaultShardDataChangeListenerPublisher(), "");
+ new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
}
final String logContext() {
((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
+ } else if (payload instanceof CreateLocalHistoryPayload) {
+ allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeLocalHistoryPayload) {
+ allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof DataTreeCandidatePayload) {
applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
} else {
Verify.verify(identifier instanceof TransactionIdentifier);
payloadReplicationComplete((TransactionIdentifier) identifier);
}
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+ } else {
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof CreateLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+ } else {
+ allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof PurgeLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+ } else {
+ allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ }
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
+ private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+ if (callback != null) {
+ replicationCallbacks.put(payload, callback);
+ }
+ shard.persistPayload(id, payload, true);
+ }
+
+ private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+ final Runnable callback = replicationCallbacks.remove(payload);
+ if (callback != null) {
+ LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+ callback.run();
+ } else {
+ LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+ }
+ }
+
private void payloadReplicationComplete(final TransactionIdentifier txId) {
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
}
}
- ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
- ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
+ private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryCreated(historyId);
+ }
+ }
+
+ private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryClosed(historyId);
+ }
+ }
+
+ private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryPurged(historyId);
+ }
+ }
+
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+ ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
- chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
- transactionChains.put(localHistoryIdentifier, chain);
+ chain = new ShardDataTreeTransactionChain(historyId, this);
+ transactionChains.put(historyId, chain);
+ shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
}
return chain;
}
}
+ /**
+ * Immediately close all transaction chains.
+ */
void closeAllTransactionChains() {
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
transactionChains.clear();
}
- void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
- final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
- if (chain != null) {
- chain.close();
- } else {
- LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
+ /**
+ * Close a single transaction chain.
+ *
+ * @param id History identifier
+ * @param callback Callback to invoke upon completion, may be null
+ */
+ void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
}
+
+ chain.close();
+ replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ }
+
+ /**
+ * Purge a single transaction chain.
+ *
+ * @param id History identifier
+ * @param callback Callback to invoke upon completion, may be null
+ */
+ void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
+ if (chain == null) {
+ LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
}
Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
- entry.lastAccess = shard.ticker().read();
+ entry.lastAccess = ticker().read();
return;
} catch (ConflictingModificationAppliedException e) {
LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
processNextPendingTransaction();
}
- private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+ private void processNextPending(final Queue<CommitEntry> queue, final State allowedState,
+ final Consumer<CommitEntry> processor) {
while (!queue.isEmpty()) {
final CommitEntry entry = queue.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
// Set the tip of the data tree.
tip = Verify.verifyNotNull(candidate);
- entry.lastAccess = shard.ticker().read();
+ entry.lastAccess = ticker().read();
pendingTransactions.remove();
pendingCommits.add(entry);
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
- pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+ pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
return cohort;
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
- final long now = shard.ticker().read();
+ final long now = ticker().read();
final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
!pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void rebaseTransactions(Iterator<CommitEntry> iter, @Nonnull TipProducingDataTreeTip newTip) {
+ private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
tip = Preconditions.checkNotNull(newTip);
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
// Lifecycle events
abstract void onTransactionCommitted(TransactionIdentifier txId);
+ abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
+
abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
+
}
@NotThreadSafe
final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
implements Identifiable<LocalHistoryIdentifier> {
+
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
private final LocalHistoryIdentifier chainId;
private final ShardDataTree dataTree;
void close() {
closed = true;
+ LOG.debug("Closing chain {}", chainId);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is closed cleanly. It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+ private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected CloseLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+ final byte[] serialized) {
+ return new CloseLocalHistoryPayload(identifier, serialized);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+ super(historyId, serialized);
+ }
+
+ public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ historyId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", historyId, e);
+ throw Throwables.propagate(e);
+ }
+ return new CloseLocalHistoryPayload(historyId, out.toByteArray());
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is created. It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+ private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected CreateLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+ final byte[] serialized) {
+ return new CreateLocalHistoryPayload(identifier, serialized);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+ super(historyId, serialized);
+ }
+
+ public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ historyId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", historyId, e);
+ throw Throwables.propagate(e);
+ }
+ return new CreateLocalHistoryPayload(historyId, out.toByteArray());
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is completely purged, i.e. the frontend has removed it from its tracking.
+ * It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+ private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected PurgeLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+ final byte[] serialized) {
+ return new PurgeLocalHistoryPayload(identifier, serialized);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+ super(historyId, serialized);
+ }
+
+ public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ historyId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", historyId, e);
+ throw Throwables.propagate(e);
+ }
+ return new PurgeLocalHistoryPayload(historyId, out.toByteArray());
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
{
final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
@Override
- void persistPayload(final TransactionIdentifier transactionId, final Payload payload,
- boolean batchHint) {
+ void persistPayload(final Identifier id, final Payload payload,
+ final boolean batchHint) {
// Simulate an AbortTransaction message occurring during
// replication, after
// persisting and before finishing the commit to the
// in-memory store.
- doAbortTransaction(transactionId, null);
- super.persistPayload(transactionId, payload, batchHint);
+ doAbortTransaction(id, null);
+ super.persistPayload(id, payload, batchHint);
}
};