BUG-5280: persist metadata in snaphots 74/42974/19
authorRobert Varga <rovarga@cisco.com>
Mon, 1 Aug 2016 21:20:11 +0000 (23:20 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Fri, 12 Aug 2016 08:39:13 +0000 (08:39 +0000)
This patch adds the wiring in ShardDataTree to persist
various pieces of metadata in a snapshot. It also includes
metadata recovery from a snapshot.

In order to make this work, this patch centralizes all
actual payload and snapshot handling within the ShardDataTree
by introducing explicit entrypoints for each avenue through
which data can be introduced.

Change-Id: Ibc15bd152bd44dd583d67bb7fc61bc8f3086df30
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java

index 25a7ee86229dcccf7b42a4c2d0eae1561c43ae12..a971ee6ad55f19f36d5e0b3c376b0d6765b64d6b 100644 (file)
@@ -14,12 +14,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
@@ -27,7 +22,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
  * @deprecated Deprecated in Boron in favor of CommitTransactionPayload
  */
 @Deprecated
-final class DataTreeCandidatePayload extends Payload implements DataTreeCandidateSupplier, Externalizable {
+final class DataTreeCandidatePayload extends Payload implements Externalizable {
     private static final long serialVersionUID = 1L;
 
     private transient byte[] serialized;
@@ -55,11 +50,8 @@ final class DataTreeCandidatePayload extends Payload implements DataTreeCandidat
         return new DataTreeCandidatePayload(out.toByteArray());
     }
 
-
-    @Override
-    public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
-        return new SimpleImmutableEntry<>(Optional.empty(),
-                DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized)));
+    public DataTreeCandidate getCandidate() throws IOException {
+        return DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized));
     }
 
     @Override
index 3fe349f798d826f741418b813f7c86e06bc701ec..fbd7c89b6fe17ea4b1ac23657ff06180fe808c3b 100644 (file)
@@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -569,21 +568,14 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
-        if (data instanceof DataTreeCandidateSupplier) {
-            if (clientActor == null) {
-                // No clientActor indicates a replica coming from the leader
-                try {
-                    store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data);
-                } catch (DataValidationFailedException | IOException e) {
-                    LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
-                }
-            } else {
-                // Replication consensus reached, proceed to commit
-                store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data);
+        if (data instanceof Payload) {
+            try {
+                store.applyReplicatedPayload(identifier, (Payload)data);
+            } catch (DataValidationFailedException | IOException e) {
+                LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
             }
         } else {
-            LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
-                data.getClass().getClassLoader());
+            LOG.error("{}: Unknown state for {} received {}", persistenceId(), identifier, data);
         }
     }
 
index 89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d..f1d37872fd20262ae4e3c114f8ad96c6cf7e7e75 100644 (file)
@@ -12,7 +12,11 @@ import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.primitives.UnsignedLong;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
@@ -27,15 +31,18 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -90,6 +97,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
+    private final Collection<ShardDataTreeMetadata<?>> metadata;
     private final TipProducingDataTree dataTree;
     private final String logContext;
     private final Shard shard;
@@ -99,14 +107,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
-        this.dataTree = dataTree;
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final ShardDataTreeMetadata<?>... metadata) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
         updateSchemaContext(schemaContext);
 
         this.shard = Preconditions.checkNotNull(shard);
         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
+        this.metadata = ImmutableList.copyOf(metadata);
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@@ -139,18 +149,205 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(schemaContext);
     }
 
-    ShardDataTreeSnapshot takeRecoverySnapshot() {
-        return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+    /**
+     * Take a snapshot of current state for later recovery.
+     *
+     * @return A state snapshot
+     */
+    @Nonnull ShardDataTreeSnapshot takeStateSnapshot() {
+        final NormalizedNode<?, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+        final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
+                ImmutableMap.builder();
+
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            final ShardDataTreeSnapshotMetadata<?> meta = m.toStapshot();
+            if (meta != null) {
+                metaBuilder.put(meta.getType(), meta);
+            }
+        }
+
+        return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
     }
 
-    void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
-        // FIXME: purge any outstanding transactions
+    private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+            final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
+        final Stopwatch elapsed = Stopwatch.createStarted();
 
-        final DataTreeModification snapshot = transaction.getSnapshot();
-        snapshot.ready();
+        if (!pendingTransactions.isEmpty()) {
+            LOG.warn("{}: applying state snapshot with pending transactions", logContext);
+        }
+
+        final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> snapshotMeta;
+        if (snapshot instanceof MetadataShardDataTreeSnapshot) {
+            snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+        } else {
+            snapshotMeta = ImmutableMap.of();
+        }
+
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            final ShardDataTreeSnapshotMetadata<?> s = snapshotMeta.get(m.getSupportedType());
+            if (s != null) {
+                m.applySnapshot(s);
+            } else {
+                m.reset();
+            }
+        }
+
+        final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
+        // delete everything first
+        mod.delete(YangInstanceIdentifier.EMPTY);
+
+        final java.util.Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+        if (maybeNode.isPresent()) {
+            // Add everything from the remote node back
+            mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+        }
+        mod.ready();
+
+        final DataTreeModification unwrapped = unwrap(mod);
+        dataTree.validate(unwrapped);
+        dataTree.commit(dataTree.prepare(unwrapped));
+        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+    }
+
+    private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
+        return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+    }
+
+    private static DataTreeModification unwrap(final DataTreeModification modification) {
+        if (modification instanceof PruningDataTreeModification) {
+            return ((PruningDataTreeModification)modification).delegate();
+        }
+        return modification;
+    }
+
+    /**
+     * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+     * pruning in an attempt to adjust the state to our current SchemaContext.
+     *
+     * @param snapshot Snapshot that needs to be applied
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+        applySnapshot(snapshot, this::wrapWithPruning);
+    }
 
-        dataTree.validate(snapshot);
-        dataTree.commit(dataTree.prepare(snapshot));
+
+    /**
+     * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+     * does not perform any pruning.
+     *
+     * @param snapshot Snapshot that needs to be applied
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+        applySnapshot(snapshot, UnaryOperator.identity());
+    }
+
+    private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+        final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
+        DataTreeCandidates.applyToModification(mod, candidate);
+        mod.ready();
+
+        final DataTreeModification unwrapped = mod.delegate();
+        LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+        dataTree.validate(unwrapped);
+        dataTree.commit(dataTree.prepare(unwrapped));
+    }
+
+    /**
+     * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+     * pruning in an attempt to adjust the state to our current SchemaContext.
+     *
+     * @param payload Payload
+     * @throws IOException when the snapshot fails to deserialize
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+        if (payload instanceof CommitTransactionPayload) {
+            final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+            applyRecoveryCandidate(e.getValue());
+            allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof DataTreeCandidatePayload) {
+            applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
+        } else {
+            LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+        }
+    }
+
+    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+            throws DataValidationFailedException {
+        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+        DataTreeCandidates.applyToModification(mod, foreign);
+        mod.ready();
+
+        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+        dataTree.validate(mod);
+        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        dataTree.commit(candidate);
+
+        notifyListeners(candidate);
+    }
+
+    /**
+     * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+     * SchemaContexts match and does not perform any pruning.
+     *
+     * @param identifier Payload identifier as returned from RaftActor
+     * @param payload Payload
+     * @throws IOException when the snapshot fails to deserialize
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+            DataValidationFailedException {
+        /*
+         * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+         * if we are the leader and it has originated with us.
+         *
+         * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+         * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+         * acting in that capacity anymore.
+         *
+         * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+         * pre-Boron state -- which limits the number of options here.
+         */
+        if (payload instanceof CommitTransactionPayload) {
+            if (identifier == null) {
+                final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+                applyReplicatedCandidate(e.getKey(), e.getValue());
+                allMetadataCommittedTransaction(e.getKey());
+            } else {
+                Verify.verify(identifier instanceof TransactionIdentifier);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
+            }
+        } else {
+            LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+        }
+    }
+
+    private void payloadReplicationComplete(final TransactionIdentifier txId) {
+        final CommitEntry current = pendingTransactions.peek();
+        if (current == null) {
+            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            return;
+        }
+
+        if (!current.cohort.getIdentifier().equals(txId)) {
+            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+                current.cohort.getIdentifier(), txId);
+            return;
+        }
+
+        finishCommit(current.cohort);
+    }
+
+    private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.transactionCommitted(txId);
+        }
     }
 
     private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
@@ -249,20 +446,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return pendingTransactions.size();
     }
 
-    void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
-        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
-
-        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
-        DataTreeCandidates.applyToModification(mod, foreign);
-        mod.ready();
-
-        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
-        dataTree.validate(mod);
-        final DataTreeCandidate candidate = dataTree.prepare(mod);
-        dataTree.commit(candidate);
-        notifyListeners(candidate);
-    }
-
     @Override
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
         // Intentional no-op
@@ -288,12 +471,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return dataTree.takeSnapshot().newModification();
     }
 
+    /**
+     * @deprecated This method violates DataTree containment and will be removed.
+     */
     @VisibleForTesting
-    // FIXME: This should be removed, it violates encapsulation
+    @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
         modification.ready();
         dataTree.validate(modification);
-        DataTreeCandidateTip candidate = dataTree.prepare(modification);
+        DataTreeCandidate candidate = dataTree.prepare(modification);
         dataTree.commit(candidate);
         return candidate;
     }
@@ -404,24 +590,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
 
         try {
-            try {
-                dataTree.commit(candidate);
-            } catch (IllegalStateException e) {
-                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
-                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
-                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
-                // applying it to the state. We then become the leader and a second tx is pre-committed and
-                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
-                // candidate via applyState prior to the second tx. Since the second tx has already been
-                // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
-                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
-                // solution will be forthcoming.
-
-                LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
-                applyForeignCandidate(txId, candidate);
-            }
+            dataTree.commit(candidate);
         } catch (Exception e) {
+            LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
             failCommit(e);
             return;
         }
@@ -430,7 +601,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         // FIXME: propagate journal index
-
         pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
 
         LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
@@ -468,28 +638,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
     }
 
-    private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
-        final CommitEntry current = pendingTransactions.peek();
-        if (current == null) {
-            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
-            return;
-        }
-
-        if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
-                current.cohort.getIdentifier(), txId);
-            return;
-        }
-
-        finishCommit(current.cohort);
-    }
-
-    void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
-        // For now we do not care about anything else but transactions
-        Verify.verify(identifier instanceof TransactionIdentifier);
-        payloadReplicationComplete((TransactionIdentifier)identifier, payload);
-    }
-
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
@@ -502,11 +650,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return cohort;
     }
 
-    void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
-            throws DataValidationFailedException, IOException {
-        applyForeignCandidate(identifier, payload.getCandidate().getValue());
-    }
-
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = shard.ticker().read();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java
new file mode 100644 (file)
index 0000000..8a62a2b
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Verify;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+
+abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
+    final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata<?> snapshot) {
+        Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot,
+            getSupportedType());
+        doApplySnapshot(getSupportedType().cast(snapshot));
+    }
+
+    abstract void reset();
+
+    abstract void doApplySnapshot(@Nonnull T snapshot);
+
+    abstract @Nonnull Class<T> getSupportedType();
+
+    abstract @Nullable T toStapshot();
+
+    // Lifecycle events
+    abstract void transactionCommitted(TransactionIdentifier txId);
+}
index c53375919396a7628386d8ae81f472f2d3e90fdc..70a701075ec6b5867e7a765629fcc65d6b72d8cb 100644 (file)
@@ -10,22 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.io.File;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
-import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 
 /**
@@ -41,71 +30,51 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private final ShardDataTree store;
     private final String shardName;
     private final Logger log;
-    private PruningDataTreeModification transaction;
-    private int size;
     private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(ShardDataTree store,  byte[] restoreFromSnapshot, String shardName, Logger log) {
+    private boolean open;
+
+    ShardRecoveryCoordinator(final ShardDataTree store,  final byte[] restoreFromSnapshot, final String shardName, final Logger log) {
         this.store = Preconditions.checkNotNull(store);
-        this.restoreFromSnapshot = restoreFromSnapshot;
         this.shardName = Preconditions.checkNotNull(shardName);
         this.log = Preconditions.checkNotNull(log);
+
+        this.restoreFromSnapshot = restoreFromSnapshot;
     }
 
     @Override
-    public void startLogRecoveryBatch(int maxBatchSize) {
+    public void startLogRecoveryBatch(final int maxBatchSize) {
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
-        transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(),
-            store.getSchemaContext());
-        size = 0;
+        open = true;
     }
 
     @Override
-    public void appendRecoveredLogEntry(Payload payload) {
-        Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+    public void appendRecoveredLogEntry(final Payload payload) {
+        Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
 
         try {
-            if (payload instanceof DataTreeCandidateSupplier) {
-                final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
-                        ((DataTreeCandidateSupplier)payload).getCandidate();
-
-                DataTreeCandidates.applyToModification(transaction, e.getValue());
-                size++;
-
-                if (e.getKey().isPresent()) {
-                    // FIXME: BUG-5280: propagate transaction state
-                }
-            } else {
-                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
-            }
-        } catch (IOException e) {
-            log.error("{}: Error extracting payload", shardName, e);
+            store.applyRecoveryPayload(payload);
+        } catch (Exception e) {
+            log.error("{}: failed to apply payload {}", shardName, payload, e);
+            throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+                shardName, payload), e);
         }
     }
 
-    private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
-        store.commit(tx.getResultingModification());
-    }
-
     /**
      * Applies the current batched log entries to the data store.
      */
     @Override
     public void applyCurrentLogRecoveryBatch() {
-        Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        open = false;
+    }
 
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
-        try {
-            commitTransaction(transaction);
-        } catch (Exception e) {
-            File file = new File(System.getProperty("karaf.data", "."),
-                    "failed-recovery-batch-" + shardName + ".out");
-            DataTreeModificationOutput.toFile(file, transaction.getResultingModification());
-            throw new RuntimeException(String.format(
-                    "%s: Failed to apply recovery batch. Modification data was written to file %s",
-                    shardName, file), e);
-        }
-        transaction = null;
+    private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+        final File file = new File(System.getProperty("karaf.data", "."),
+            "failed-" + kind + "-snapshot-" + shardName + ".xml");
+        NormalizedNodeXMLOutput.toFile(file, node);
+        return file;
     }
 
     /**
@@ -120,26 +89,19 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
         final ShardDataTreeSnapshot snapshot;
         try {
             snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (IOException e) {
-            log.error("{}: failed to deserialize snapshot", e);
+        } catch (Exception e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
             throw Throwables.propagate(e);
         }
 
-        final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
-                store.getDataTree(), store.getSchemaContext());
-
-        final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
-        tx.write(YangInstanceIdentifier.EMPTY, node);
-
         try {
-            commitTransaction(tx);
+            store.applyRecoverySnapshot(snapshot);
         } catch (Exception e) {
-            File file = new File(System.getProperty("karaf.data", "."),
-                    "failed-recovery-snapshot-" + shardName + ".xml");
-            NormalizedNodeXMLOutput.toFile(file, node);
-            throw new RuntimeException(String.format(
-                    "%s: Failed to apply recovery snapshot. Node data was written to file %s",
-                    shardName, file), e);
+            log.error("{}: failed to apply snapshot {}", shardName, snapshot, e);
+
+            final File f = writeRoot("recovery", snapshot.getRootNode().orElse(null));
+            throw new IllegalStateException(String.format(
+                    "%s: Failed to apply recovery snapshot. Node data was written to file %s", shardName, f), e);
         }
     }
 
index 8bef15bbbaea1b663ca376359f602e7839b9a562..adf60a0c21441a2bd0e0fde37c2416d1d28c81b0 100644 (file)
@@ -11,18 +11,14 @@ import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 
 /**
@@ -33,17 +29,13 @@ import org.slf4j.Logger;
 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
 
-    private final LocalHistoryIdentifier applyHistoryId;
     private final ActorRef snapshotActor;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
-    private long applyCounter;
-
     private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
             final ShardDataTree store, final Logger log, final String logId) {
-        this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId);
         this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
         this.store = Preconditions.checkNotNull(store);
         this.log = log;
@@ -66,10 +58,17 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     @Override
     public void createSnapshot(final ActorRef actorRef) {
         // Forward the request to the snapshot actor
-        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef);
+        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef);
     }
 
-    private void deserializeAndApplySnapshot(final byte[] snapshotBytes) {
+    @Override
+    public void applySnapshot(final byte[] snapshotBytes) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+
+        log.info("{}: Applying snapshot", logId);
+
         final ShardDataTreeSnapshot snapshot;
         try {
             snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
@@ -79,33 +78,12 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         }
 
         try {
-            final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
-                new TransactionIdentifier(applyHistoryId, applyCounter++));
-
-            // delete everything first
-            transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY);
-
-            final Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
-            if (maybeNode.isPresent()) {
-                // Add everything from the remote node back
-                transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get());
-            }
-
-            store.applyRecoveryTransaction(transaction);
+            store.applySnapshot(snapshot);
         } catch (Exception e) {
-            log.error("{}: An exception occurred when applying snapshot", logId, e);
+            log.error("{}: Failed to apply snapshot {}", logId, snapshot, e);
+            return;
         }
 
-    }
-
-    @Override
-    public void applySnapshot(final byte[] snapshotBytes) {
-        // Since this will be done only on Recovery or when this actor is a Follower
-        // we can safely commit everything in here. We not need to worry about event notifications
-        // as they would have already been disabled on the follower
-
-        log.info("{}: Applying snapshot", logId);
-        deserializeAndApplySnapshot(snapshotBytes);
         log.info("{}: Done applying snapshot", logId);
     }
 }
index bb016a28bdb29b994a8c23697e6c6fe7843a2d2b..5fac0abe6b0a221fafcf676a9764dfa66955e0be 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
@@ -61,15 +60,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide
     }
 
     @Override
-    public DataTreeModification getDataTreeModification() {
-        DataTreeModification dataTreeModification = transaction;
-        if (transaction instanceof PruningDataTreeModification){
-            dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
-        }
-        return dataTreeModification;
+
+    DataTreeModification getDataTreeModification() {
+        return transaction;
     }
 
-    private void checkState(State expected) {
+    private void checkState(final State expected) {
         Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
     }
 
index dd27b4e629d2b593bc37e717ad52446e6b769eb2..c348727cf89f4136a8e00ec1719621a1154e5d9c 100644 (file)
@@ -19,7 +19,6 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
-import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -31,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
  * @author Robert Varga
  */
 @Beta
-public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable {
+public final class CommitTransactionPayload extends Payload implements Serializable {
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
         private byte[] serialized;
@@ -78,10 +77,9 @@ public final class CommitTransactionPayload extends Payload implements DataTreeC
         return new CommitTransactionPayload(out.toByteArray());
     }
 
-    @Override
-    public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
+    public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
         final DataInput in = ByteStreams.newDataInput(serialized);
-        return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)),
+        return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
                 DataTreeCandidateInputOutput.readDataTreeCandidate(in));
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java
deleted file mode 100644 (file)
index 4cd1c4a..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.persisted;
-
-import com.google.common.annotations.Beta;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-/**
- * Interim interface for consolidating DataTreeCandidatePayload and {@link CommitTransactionPayload}.
- *
- * @author Robert Varga
- */
-@Beta
-public interface DataTreeCandidateSupplier {
-    Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException;
-}
index 682c0b73ece07ca534b802a94b0c6da5ba67fc2e..8cde0d953287691b440d10476ecbe6e801710383 100644 (file)
@@ -20,6 +20,8 @@ import java.io.Serializable;
 import java.util.Map;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link AbstractVersionedShardDataTreeSnapshot} which contains additional metadata.
@@ -30,8 +32,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot implements Serializable {
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
+        private static final Logger LOG = LoggerFactory.getLogger(MetadataShardDataTreeSnapshot.class);
 
-        private Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+        private Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata;
         private NormalizedNode<?, ?> rootNode;
 
         public Proxy() {
@@ -46,7 +49,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeInt(metadata.size());
-            for (ShardDataTreeSnapshotMetadata m : metadata.values()) {
+            for (ShardDataTreeSnapshotMetadata<?> m : metadata.values()) {
                 out.writeObject(m);
             }
 
@@ -59,11 +62,15 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
             Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
 
             // Default pre-allocate is 4, which should be fine
-            final Builder<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metaBuilder =
+            final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
                     ImmutableMap.builder();
             for (int i = 0; i < metaSize; ++i) {
-                final ShardDataTreeSnapshotMetadata m = (ShardDataTreeSnapshotMetadata) in.readObject();
-                metaBuilder.put(m.getClass(), m);
+                final ShardDataTreeSnapshotMetadata<?> m = (ShardDataTreeSnapshotMetadata<?>) in.readObject();
+                if (m != null) {
+                    metaBuilder.put(m.getType(), m);
+                } else {
+                    LOG.warn("Skipping null metadata");
+                }
             }
 
             metadata = metaBuilder.build();
@@ -77,7 +84,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
 
     private static final long serialVersionUID = 1L;
 
-    private final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+    private final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata;
     private final NormalizedNode<?, ?> rootNode;
 
     public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode) {
@@ -85,12 +92,12 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
     }
 
     public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
-            final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata) {
+            final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
         this.rootNode = Preconditions.checkNotNull(rootNode);
         this.metadata = ImmutableMap.copyOf(metadata);
     }
 
-    public Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> getMetadata() {
+    public Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> getMetadata() {
         return metadata;
     }
 
index a20ec4eba4e04461e30a30f1b470076dd2591a7b..7941c9fa599a5ffee00321564b635e56b68be520 100644 (file)
@@ -26,7 +26,7 @@ import javax.annotation.Nonnull;
  *
  * @author Robert Varga
  */
-public abstract class ShardDataTreeSnapshotMetadata implements Serializable {
+public abstract class ShardDataTreeSnapshotMetadata<T extends ShardDataTreeSnapshotMetadata<T>> implements Serializable {
     private static final long serialVersionUID = 1L;
 
     ShardDataTreeSnapshotMetadata() {
@@ -43,4 +43,6 @@ public abstract class ShardDataTreeSnapshotMetadata implements Serializable {
      * @return Externalizable proxy, may not be null
      */
     protected abstract @Nonnull Externalizable externalizableProxy();
+
+    public abstract Class<T> getType();
 }
index 69e2c88d268afb5ca9e38016e16e064cacdbda68..697b0c516a4000ae89e1ebbc9567d62a9a82b127 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ForwardingObject;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
@@ -29,7 +31,7 @@ import org.slf4j.LoggerFactory;
  * The PruningDataTreeModification first removes all entries from the data which do not belong in the schemaContext
  * before delegating it to the actual DataTreeModification
  */
-public class PruningDataTreeModification implements DataTreeModification {
+public class PruningDataTreeModification extends ForwardingObject implements DataTreeModification {
 
     private static final Logger LOG = LoggerFactory.getLogger(PruningDataTreeModification.class);
     private DataTreeModification delegate;
@@ -37,9 +39,14 @@ public class PruningDataTreeModification implements DataTreeModification {
     private final DataTree dataTree;
 
     public PruningDataTreeModification(DataTreeModification delegate, DataTree dataTree, SchemaContext schemaContext) {
-        this.delegate = delegate;
-        this.dataTree = dataTree;
-        this.schemaContext = schemaContext;
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.schemaContext = Preconditions.checkNotNull(schemaContext);
+    }
+
+    @Override
+    public DataTreeModification delegate() {
+        return delegate;
     }
 
     @Override
@@ -140,10 +147,6 @@ public class PruningDataTreeModification implements DataTreeModification {
         return pruner.normalizedNode();
     }
 
-    public DataTreeModification getResultingModification(){
-        return delegate;
-    }
-
     private static class PruningDataTreeModificationCursor extends AbstractDataTreeModificationCursor {
         private final DataTreeModification toModification;
         private final PruningDataTreeModification pruningModification;
index cce9bddde01aefba9f6613d9ee079ba2488d3ca9..3a7b89f91e922a2812b6eb4dba72fb4d68faa786 100644 (file)
@@ -119,13 +119,13 @@ public class DataTreeCandidatePayloadTest {
     @Test
     public void testCandidateSerDes() throws IOException {
         final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+        assertCandidateEquals(candidate, payload.getCandidate());
     }
 
     @Test
     public void testPayloadSerDes() throws IOException {
         final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue());
+        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -140,7 +140,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+        assertCandidateEquals(candidate, payload.getCandidate());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -156,7 +156,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+        assertCandidateEquals(candidate, payload.getCandidate());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -172,7 +172,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+        assertCandidateEquals(candidate, payload.getCandidate());
     }
 
     @Test
@@ -183,6 +183,6 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+        assertCandidateEquals(candidate, payload.getCandidate());
     }
 }
index d4dcc9cda267943b6b48406757fd359fc820f63b..243b2cb7e20fb81ecf16161743e9bfe460137b00 100644 (file)
@@ -393,7 +393,7 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testPeerAddressResolved() throws Exception {
         new ShardTestKit(getSystem()) {{
-            ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
+            final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
                     peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
@@ -402,7 +402,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
 
             shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
-            OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
+            final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
             assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
         }};
     }
@@ -433,14 +433,14 @@ public class ShardTest extends AbstractShardTest {
 
         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
-        Stopwatch sw = Stopwatch.createStarted();
+        final Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
 
             try {
                 assertEquals("Root node", expected, readStore(shard, root));
                 return;
-            } catch(AssertionError e) {
+            } catch(final AssertionError e) {
                 // try again
             }
         }
@@ -455,19 +455,28 @@ public class ShardTest extends AbstractShardTest {
 
         ShardTestKit.waitUntilLeader(shard);
 
-        final DataTree source = setupInMemorySnapshotStore();
-        final DataTreeModification writeMod = source.takeSnapshot().newModification();
-        ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+        store.setSchemaContext(SCHEMA_CONTEXT);
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
+        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
+                Collections.<ReplicatedLogEntry> emptyList(), 1, 2, 3, 4);
+
+        shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
+
+        final DataTreeModification writeMod = store.takeSnapshot().newModification();
+        final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         writeMod.write(TestModel.TEST_PATH, node);
         writeMod.ready();
 
         final TransactionIdentifier tx = nextTransactionId();
         final ApplyState applyState = new ApplyState(null, tx,
-            new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
+                new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx)));
 
         shard.tell(applyState, shard);
 
-        Stopwatch sw = Stopwatch.createStarted();
+        final Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
 
@@ -532,7 +541,7 @@ public class ShardTest extends AbstractShardTest {
             final TransactionIdentifier transactionID2 = nextTransactionId();
             final TransactionIdentifier transactionID3 = nextTransactionId();
 
-            Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+            final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
                     shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
             final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
             final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
@@ -797,9 +806,9 @@ public class ShardTest extends AbstractShardTest {
             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
             // write will not validate the children for performance reasons.
 
-            TransactionIdentifier transactionID = nextTransactionId();
+            final TransactionIdentifier transactionID = nextTransactionId();
 
-            ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+            final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
 
@@ -808,7 +817,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(batched, getRef());
             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 
-            Throwable cause = failure.cause();
+            final Throwable cause = failure.cause();
 
             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
             batched.setReady(true);
@@ -1108,7 +1117,7 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             if(readWrite) {
-                ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+                final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
                         newReadWriteTransaction(transactionID);
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
             } else {
@@ -1764,7 +1773,7 @@ public class ShardTest extends AbstractShardTest {
             // Now send CanCommitTransaction - should fail.
 
             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
-            Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+            final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
             assertTrue("Failure type", failure instanceof IllegalStateException);
 
             // Ready and CanCommit another and verify success.
@@ -2056,7 +2065,7 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            String testName = "testClusteredDataChangeListenerDelayedRegistration";
+            final String testName = "testClusteredDataChangeListenerDelayedRegistration";
             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2089,7 +2098,7 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testClusteredDataChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            String testName = "testClusteredDataChangeListenerRegistration";
+            final String testName = "testClusteredDataChangeListenerRegistration";
             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
 
@@ -2110,7 +2119,7 @@ public class ShardTest extends AbstractShardTest {
                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-            String leaderPath = waitUntilLeader(followerShard);
+            final String leaderPath = waitUntilLeader(followerShard);
             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
@@ -2132,7 +2141,7 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+            final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2163,7 +2172,7 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            String testName = "testClusteredDataTreeChangeListenerRegistration";
+            final String testName = "testClusteredDataTreeChangeListenerRegistration";
             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
 
@@ -2184,7 +2193,7 @@ public class ShardTest extends AbstractShardTest {
                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-            String leaderPath = waitUntilLeader(followerShard);
+            final String leaderPath = waitUntilLeader(followerShard);
             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
index f25153f335aaf48bcf3e5d2b039fd499bbb1513d..2edd6197935c374496060a4ce8d46c141f5f98c9 100644 (file)
@@ -53,7 +53,7 @@ public class ShardDataTreeSnapshotTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> expMetadata =
+        Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> expMetadata =
                 ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
         byte[] serialized = snapshot.serialize();
@@ -84,7 +84,7 @@ public class ShardDataTreeSnapshotTest {
         assertEquals("Deserialized type", PreBoronShardDataTreeSnapshot.class, deserialized.getClass());
     }
 
-    static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata {
+    static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata<TestShardDataTreeSnapshotMetadata> {
         private static final long serialVersionUID = 1L;
 
         private final String data;
@@ -93,6 +93,11 @@ public class ShardDataTreeSnapshotTest {
             this.data = data;
         }
 
+        @Override
+        public Class<TestShardDataTreeSnapshotMetadata> getType() {
+            return TestShardDataTreeSnapshotMetadata.class;
+        }
+
         @Override
         protected Externalizable externalizableProxy() {
             return new Proxy(data);
@@ -108,7 +113,6 @@ public class ShardDataTreeSnapshotTest {
             return data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
         }
 
-
         private static class Proxy implements Externalizable {
             private String data;
 
index c078c94637aea11b944ec48d6c5e0d32d8d3a9f1..4671a8ef15f28e0a4f2f23c2b92d9b5dcdb6f459 100644 (file)
@@ -293,7 +293,7 @@ public class PruningDataTreeModificationTest {
 
     private DataTreeCandidateTip getCandidate() throws DataValidationFailedException {
         pruningDataTreeModification.ready();
-        DataTreeModification mod = pruningDataTreeModification.getResultingModification();
+        DataTreeModification mod = pruningDataTreeModification.delegate();
         mod = mod == proxyModification ? realModification : mod;
         dataTree.validate(mod);
         DataTreeCandidateTip candidate = dataTree.prepare(mod);