From: Moiz Raja Date: Fri, 10 Apr 2015 15:29:20 +0000 (+0000) Subject: Merge "Update lispflowmapping options in custom.properties" X-Git-Tag: release/lithium~285 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=68fb550b416dddd0a50e0110add0a4ae9b706758;hp=73086c34bae7b37993b24c236454e0ef6b14fc83 Merge "Update lispflowmapping options in custom.properties" --- diff --git a/features/netconf/src/main/resources/features.xml b/features/netconf/src/main/resources/features.xml index dbd940f5e6..80b2e36211 100644 --- a/features/netconf/src/main/resources/features.xml +++ b/features/netconf/src/main/resources/features.xml @@ -36,6 +36,7 @@ odl-netconf-mapping-api mvn:org.opendaylight.yangtools/yang-model-api/${yangtools.version} + mvn:org.opendaylight.yangtools/yang-data-api/${yangtools.version} mvn:org.opendaylight.controller/netconf-util/${project.version} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index ed19f21ded..77eaac2cfe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -19,6 +19,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -26,6 +27,8 @@ import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -34,7 +37,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa /** * A sample actor showing how the RaftActor is to be extended */ -public class ExampleActor extends RaftActor { +public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { private final Map state = new HashMap(); @@ -118,7 +121,8 @@ public class ExampleActor extends RaftActor { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { ByteString bs = null; try { bs = fromObject(state); @@ -128,7 +132,8 @@ public class ExampleActor extends RaftActor { getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } - @Override protected void applySnapshot(byte [] snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { state.clear(); try { state.putAll((HashMap) toObject(snapshot)); @@ -192,22 +197,33 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; + } + + @Override + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void applyCurrentLogRecoveryBatch() { } @Override - protected void applyCurrentLogRecoveryBatch() { + public void onRecoveryComplete() { } @Override - protected void onRecoveryComplete() { + public void applyRecoverySnapshot(byte[] snapshot) { } @Override - protected void applyRecoverySnapshot(byte[] snapshot) { + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 1aecc89eea..c27ff37386 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; @@ -19,7 +20,7 @@ import java.util.List; public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { // We define this as ArrayList so we can use ensureCapacity. - protected ArrayList journal; + private ArrayList journal; private long snapshotIndex = -1; private long snapshotTerm = -1; @@ -28,13 +29,17 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private ArrayList snapshottedJournal; private long previousSnapshotIndex = -1; private long previousSnapshotTerm = -1; - protected int dataSize = 0; + private int dataSize = 0; public AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; this.journal = new ArrayList<>(unAppliedEntries); + + for(ReplicatedLogEntry entry: journal) { + dataSize += entry.size(); + } } public AbstractReplicatedLogImpl() { @@ -90,18 +95,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void removeFrom(long logEntryIndex) { + public long removeFrom(long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { // physical index should be less than list size and >= 0 - return; + return -1; + } + + for(int i = adjustedIndex; i < journal.size(); i++) { + dataSize -= journal.get(i).size(); } + journal.subList(adjustedIndex , journal.size()).clear(); + + return adjustedIndex; } @Override public void append(ReplicatedLogEntry replicatedLogEntry) { journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); } @Override @@ -230,4 +243,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshotTerm = previousSnapshotTerm; previousSnapshotTerm = -1; } + + @VisibleForTesting + ReplicatedLogEntry getAtPhysicalIndex(int index) { + return journal.get(index); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c30fe2317..41a807aa35 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,15 +11,10 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.japi.Procedure; -import akka.persistence.RecoveryCompleted; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.io.Serializable; @@ -27,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; @@ -36,11 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; @@ -99,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - private static final String COMMIT_SNAPSHOT = "commit_snapshot"; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** @@ -117,11 +107,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); - private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); - - private Stopwatch recoveryTimer; + private RaftActorRecoverySupport raftRecovery; - private int currentRecoveryBatchCount; + private RaftActorSnapshotMessageSupport snapshotSupport; private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); @@ -140,12 +128,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior)); } - private void initRecoveryTimer() { - if(recoveryTimer == null) { - recoveryTimer = Stopwatch.createStarted(); - } - } - @Override public void preStart() throws Exception { LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), @@ -156,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void postStop() { - if(currentBehavior != null) { + if(currentBehavior.getDelegate() != null) { try { currentBehavior.close(); } catch (Exception e) { @@ -169,134 +151,28 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleRecover(Object message) { - if(persistence().isRecoveryApplicable()) { - if (message instanceof SnapshotOffer) { - onRecoveredSnapshot((SnapshotOffer) message); - } else if (message instanceof ReplicatedLogEntry) { - onRecoveredJournalLogEntry((ReplicatedLogEntry) message); - } else if (message instanceof ApplyLogEntries) { - // Handle this message for backwards compatibility with pre-Lithium versions. - onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex()); - } else if (message instanceof ApplyJournalEntries) { - onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); - } else if (message instanceof DeleteEntries) { - replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); - } else if (message instanceof RecoveryCompleted) { - onRecoveryCompletedMessage(); - } - } else { - if (message instanceof RecoveryCompleted) { + if(raftRecovery == null) { + raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior, + getRaftActorRecoveryCohort()); + } + + boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message); + if(recoveryComplete) { + if(!persistence().isRecoveryApplicable()) { // Delete all the messages from the akka journal so that we do not end up with consistency issues // Note I am not using the dataPersistenceProvider and directly using the akka api here deleteMessages(lastSequenceNr()); // Delete all the akka snapshots as they will not be needed deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); - - onRecoveryComplete(); - - initializeBehavior(); } - } - } - - private void onRecoveredSnapshot(SnapshotOffer offer) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: SnapshotOffer called..", persistenceId()); - } - - initRecoveryTimer(); - - Snapshot snapshot = (Snapshot) offer.snapshot(); - // Create a replicated log with the snapshot information - // The replicated log can be used later on to retrieve this snapshot - // when we need to install it on a peer + onRecoveryComplete(); - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider, - currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); - context.setCommitIndex(snapshot.getLastAppliedIndex()); + initializeBehavior(); - Stopwatch timer = Stopwatch.createStarted(); - - // Apply the snapshot to the actors state - applyRecoverySnapshot(snapshot.getState()); - - timer.stop(); - LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + - replicatedLog().size(), persistenceId(), timer.toString(), - replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); - } - - private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex()); + raftRecovery = null; } - - replicatedLog().append(logEntry); - } - - private void onRecoveredApplyLogEntries(long toIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - persistenceId(), context.getLastApplied() + 1, toIndex); - } - - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); - } - - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); - } - - private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { - initRecoveryTimer(); - - int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); - if(currentRecoveryBatchCount == 0) { - startLogRecoveryBatch(batchSize); - } - - appendRecoveredLogEntry(logEntry.getData()); - - if(++currentRecoveryBatchCount >= batchSize) { - endCurrentLogRecoveryBatch(); - } - } - - private void endCurrentLogRecoveryBatch() { - applyCurrentLogRecoveryBatch(); - currentRecoveryBatchCount = 0; - } - - private void onRecoveryCompletedMessage() { - if(currentRecoveryBatchCount > 0) { - endCurrentLogRecoveryBatch(); - } - - onRecoveryComplete(); - - String recoveryTime = ""; - if(recoveryTimer != null) { - recoveryTimer.stop(); - recoveryTime = " in " + recoveryTimer.toString(); - recoveryTimer = null; - } - - LOG.info( - "Recovery completed" + recoveryTime + " - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), - replicatedLog().getSnapshotTerm(), replicatedLog().size()); - - initializeBehavior(); } protected void initializeBehavior(){ @@ -309,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); } - @Override public void handleCommand(Object message) { + @Override + public void handleCommand(Object message) { + if(snapshotSupport == null) { + snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context, + currentBehavior, getRaftActorSnapshotCohort(), self()); + } + + boolean handled = snapshotSupport.handleSnapshotMessage(message); + if(handled) { + return; + } + if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -336,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistence().persist(applyEntries, NoopProcedure.instance()); - } else if(message instanceof ApplySnapshot ) { - Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm() - ); - } - - applySnapshot(snapshot.getState()); - - //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider, - currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); - } else if (message instanceof FindLeader) { getSender().tell( new FindLeaderReply(getLeaderAddress()), getSelf() ); - - } else if (message instanceof SaveSnapshotSuccess) { - SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); - - long sequenceNumber = success.metadata().sequenceNr(); - - commitSnapshot(sequenceNumber); - - } else if (message instanceof SaveSnapshotFailure) { - SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - - LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", - persistenceId(), saveSnapshotFailure.cause()); - - context.getSnapshotManager().rollback(); - - } else if (message instanceof CaptureSnapshot) { - LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - - context.getSnapshotManager().create(createSnapshotProcedure); - - } else if (message instanceof CaptureSnapshotReply) { - handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); - } else if (message.equals(COMMIT_SNAPSHOT)) { - commitSnapshot(-1); } else { reusableBehaviorStateHolder.init(getCurrentBehavior()); @@ -621,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. - self().tell(COMMIT_SNAPSHOT, self()); + self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self()); } }); } @@ -645,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } - protected void commitSnapshot(long sequenceNumber) { - context.getSnapshotManager().commit(persistence(), sequenceNumber); - } - /** * The applyState method will be called by the RaftActor when some data * needs to be applied to the actor's state @@ -670,31 +510,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Object data); /** - * This method is called during recovery at the start of a batch of state entries. Derived - * classes should perform any initialization needed to start a batch. + * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - protected abstract void startLogRecoveryBatch(int maxBatchSize); - - /** - * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startLogRecoveryBatch}. - * - * @param data the state data - */ - protected abstract void appendRecoveredLogEntry(Payload data); - - /** - * This method is called during recovery to reconstruct the state of the actor. - * - * @param snapshotBytes A snapshot of the state of the actor - */ - protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); - - /** - * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveredLogEntry}. - */ - protected abstract void applyCurrentLogRecoveryBatch(); + @Nonnull + protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** * This method is called when recovery is complete. @@ -702,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * This method will be called by the RaftActor when a snapshot needs to be - * created. The derived actor should respond with its current state. - *

- * During recovery the state that is returned by the derived actor will - * be passed back to it by calling the applySnapshot method - * - * @return The current state of the actor + * Returns the RaftActorSnapshotCohort to participate in persistence recovery. */ - protected abstract void createSnapshot(); - - /** - * This method can be called at any other point during normal - * operations when the derived actor is out of sync with it's peers - * and the only way to bring it in sync is by applying a snapshot - * - * @param snapshotBytes A snapshot of the state of the actor - */ - protected abstract void applySnapshot(byte[] snapshotBytes); + @Nonnull + protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -753,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return peerAddress; } - private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - - context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory()); - } - protected boolean hasFollowers(){ return getRaftActorContext().hasFollowers(); } @@ -795,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private class CreateSnapshotProcedure implements Procedure { - - @Override - public void apply(Void aVoid) throws Exception { - createSnapshot(); - } - } - private static class BehaviorStateHolder { private RaftActorBehavior behavior; private String leaderId; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java new file mode 100644 index 0000000000..a9f00aa80b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; + +/** + * Interface for a class that participates in raft actor persistence recovery. + * + * @author Thomas Pantelis + */ +public interface RaftActorRecoveryCohort { + + /** + * This method is called during recovery at the start of a batch of state entries. Derived + * classes should perform any initialization needed to start a batch. + */ + void startLogRecoveryBatch(int maxBatchSize); + + /** + * This method is called during recovery to append state data to the current batch. This method + * is called 1 or more times after {@link #startLogRecoveryBatch}. + * + * @param data the state data + */ + void appendRecoveredLogEntry(Payload data); + + /** + * This method is called during recovery to reconstruct the state of the actor. + * + * @param snapshotBytes A snapshot of the state of the actor + */ + void applyRecoverySnapshot(byte[] snapshotBytes); + + /** + * This method is called during recovery at the end of a batch to apply the current batched + * log entries. This method is called after {@link #appendRecoveredLogEntry}. + */ + void applyCurrentLogRecoveryBatch(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java new file mode 100644 index 0000000000..d2c14de73d --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.persistence.RecoveryCompleted; +import akka.persistence.SnapshotOffer; +import com.google.common.base.Stopwatch; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +/** + * Support class that handles persistence recovery for a RaftActor. + * + * @author Thomas Pantelis + */ +class RaftActorRecoverySupport { + private final DataPersistenceProvider persistence; + private final RaftActorContext context; + private final RaftActorBehavior currentBehavior; + private final RaftActorRecoveryCohort cohort; + + private int currentRecoveryBatchCount; + + private Stopwatch recoveryTimer; + private final Logger log; + + RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context, + RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) { + this.persistence = persistence; + this.context = context; + this.currentBehavior = currentBehavior; + this.cohort = cohort; + this.log = context.getLogger(); + } + + boolean handleRecoveryMessage(Object message) { + boolean recoveryComplete = false; + if(persistence.isRecoveryApplicable()) { + if (message instanceof SnapshotOffer) { + onRecoveredSnapshot((SnapshotOffer) message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry) message); + } else if (message instanceof ApplyLogEntries) { + // Handle this message for backwards compatibility with pre-Lithium versions. + onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex()); + } else if (message instanceof ApplyJournalEntries) { + onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); + } else if (message instanceof DeleteEntries) { + replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + recoveryComplete = true; + } + } else if (message instanceof RecoveryCompleted) { + recoveryComplete = true; + } + + return recoveryComplete; + } + + private ReplicatedLog replicatedLog() { + return context.getReplicatedLog(); + } + + private void initRecoveryTimer() { + if(recoveryTimer == null) { + recoveryTimer = Stopwatch.createStarted(); + } + } + + private void onRecoveredSnapshot(SnapshotOffer offer) { + if(log.isDebugEnabled()) { + log.debug("{}: SnapshotOffer called..", context.getId()); + } + + initRecoveryTimer(); + + Snapshot snapshot = (Snapshot) offer.snapshot(); + + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior)); + context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); + + Stopwatch timer = Stopwatch.createStarted(); + + // Apply the snapshot to the actors state + cohort.applyRecoverySnapshot(snapshot.getState()); + + timer.stop(); + log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}", + context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); + } + + private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { + if(log.isDebugEnabled()) { + log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(), + logEntry.getIndex(), logEntry.size()); + } + + replicatedLog().append(logEntry); + } + + private void onRecoveredApplyLogEntries(long toIndex) { + if(log.isDebugEnabled()) { + log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", + context.getId(), context.getLastApplied() + 1, toIndex); + } + + for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { + batchRecoveredLogEntry(replicatedLog().get(i)); + } + + context.setLastApplied(toIndex); + context.setCommitIndex(toIndex); + } + + private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { + initRecoveryTimer(); + + int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); + if(currentRecoveryBatchCount == 0) { + cohort.startLogRecoveryBatch(batchSize); + } + + cohort.appendRecoveredLogEntry(logEntry.getData()); + + if(++currentRecoveryBatchCount >= batchSize) { + endCurrentLogRecoveryBatch(); + } + } + + private void endCurrentLogRecoveryBatch() { + cohort.applyCurrentLogRecoveryBatch(); + currentRecoveryBatchCount = 0; + } + + private void onRecoveryCompletedMessage() { + if(currentRecoveryBatchCount > 0) { + endCurrentLogRecoveryBatch(); + } + + String recoveryTime = ""; + if(recoveryTimer != null) { + recoveryTimer.stop(); + recoveryTime = " in " + recoveryTimer.toString(); + recoveryTimer = null; + } + + log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + + "Persistence Id = " + context.getId() + + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " + + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java new file mode 100644 index 0000000000..ad68726371 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +/** + * Interface for a class that participates in raft actor snapshotting. + * + * @author Thomas Pantelis + */ +public interface RaftActorSnapshotCohort { + + /** + * This method is called by the RaftActor when a snapshot needs to be + * created. The implementation should send a CaptureSnapshotReply to the given actor. + * + * @param actorRef the actor to which to respond + */ + void createSnapshot(ActorRef actorRef); + + /** + * This method is called to apply a snapshot installed by the leader. + * + * @param snapshotBytes a snapshot of the state of the actor + */ + void applySnapshot(byte[] snapshotBytes); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java new file mode 100644 index 0000000000..21c8ffa68e --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.japi.Procedure; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +/** + * Handles snapshot related messages for a RaftActor. + * + * @author Thomas Pantelis + */ +class RaftActorSnapshotMessageSupport { + static final String COMMIT_SNAPSHOT = "commit_snapshot"; + + private final DataPersistenceProvider persistence; + private final RaftActorContext context; + private final RaftActorBehavior currentBehavior; + private final RaftActorSnapshotCohort cohort; + private final ActorRef raftActorRef; + private final Logger log; + + private final Procedure createSnapshotProcedure = new Procedure() { + @Override + public void apply(Void notUsed) throws Exception { + cohort.createSnapshot(raftActorRef); + } + }; + + RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context, + RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) { + this.persistence = persistence; + this.context = context; + this.currentBehavior = currentBehavior; + this.cohort = cohort; + this.raftActorRef = raftActorRef; + this.log = context.getLogger(); + } + + boolean handleSnapshotMessage(Object message) { + if(message instanceof ApplySnapshot ) { + onApplySnapshot(((ApplySnapshot) message).getSnapshot()); + return true; + } else if (message instanceof SaveSnapshotSuccess) { + onSaveSnapshotSuccess((SaveSnapshotSuccess) message); + return true; + } else if (message instanceof SaveSnapshotFailure) { + onSaveSnapshotFailure((SaveSnapshotFailure) message); + return true; + } else if (message instanceof CaptureSnapshot) { + onCaptureSnapshot(message); + return true; + } else if (message instanceof CaptureSnapshotReply) { + onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); + return true; + } else if (message.equals(COMMIT_SNAPSHOT)) { + context.getSnapshotManager().commit(persistence, -1); + return true; + } else { + return false; + } + } + + private void onCaptureSnapshotReply(byte[] snapshotBytes) { + log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length); + + context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory()); + } + + private void onCaptureSnapshot(Object message) { + log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message); + + context.getSnapshotManager().create(createSnapshotProcedure); + } + + private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) { + log.error("{}: SaveSnapshotFailure received for snapshot Cause:", + context.getId(), saveSnapshotFailure.cause()); + + context.getSnapshotManager().rollback(); + } + + private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) { + log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId()); + + long sequenceNumber = success.metadata().sequenceNr(); + + context.getSnapshotManager().commit(persistence, sequenceNumber); + } + + private void onApplySnapshot(Snapshot snapshot) { + if(log.isDebugEnabled()) { + log.debug("{}: ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm()); + } + + cohort.applySnapshot(snapshot.getState()); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, + currentBehavior)); + context.setLastApplied(snapshot.getLastAppliedIndex()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 3e4d727c71..8388eaf743 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -51,8 +51,9 @@ public interface ReplicatedLog { * information * * @param index the index of the log entry + * @return the adjusted index of the first log entry removed or -1 if log entry not found. */ - void removeFrom(long index); + long removeFrom(long index); /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index fdb6305381..5a77b9aea3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -28,10 +28,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private final Procedure deleteProcedure = new Procedure() { @Override public void apply(DeleteEntries param) { - dataSize = 0; - for (ReplicatedLogEntry entry : journal) { - dataSize += entry.size(); - } } }; @@ -57,16 +53,11 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { @Override public void removeFromAndPersist(long logEntryIndex) { - int adjustedIndex = adjustedIndex(logEntryIndex); - - if (adjustedIndex < 0) { - return; - } - // FIXME: Maybe this should be done after the command is saved - journal.subList(adjustedIndex , journal.size()).clear(); - - persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure); + long adjustedIndex = removeFrom(logEntryIndex); + if(adjustedIndex >= 0) { + persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure); + } } @Override @@ -83,7 +74,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs - journal.add(replicatedLogEntry); + append(replicatedLogEntry); // When persisting events with persist it is guaranteed that the // persistent actor will not receive further commands between the @@ -96,8 +87,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public void apply(ReplicatedLogEntry evt) throws Exception { int logEntrySize = replicatedLogEntry.size(); - dataSize += logEntrySize; - long dataSizeForCheck = dataSize; + long dataSizeForCheck = dataSize(); dataSizeSinceLastSnapshot += logEntrySize; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index b910313b09..3c6c8281fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -112,14 +112,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } @Override - protected void createSnapshot() { + public void createSnapshot(ActorRef actorRef) { if(snapshot != null) { getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender()); } } @Override - protected void applyRecoverySnapshot(byte[] bytes) { + public void applyRecoverySnapshot(byte[] bytes) { } void setSnapshot(byte[] snapshot) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index 8fdb7ea226..c99f253657 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -15,7 +15,6 @@ import akka.japi.Procedure; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -40,14 +39,6 @@ public class AbstractReplicatedLogImplTest { } - @After - public void tearDown() { - replicatedLogImpl.journal.clear(); - replicatedLogImpl.setSnapshotIndex(-1); - replicatedLogImpl.setSnapshotTerm(-1); - replicatedLogImpl = null; - } - @Test public void testIndexOperations() { @@ -65,7 +56,7 @@ public class AbstractReplicatedLogImplTest { // now create a snapshot of 3 entries, with 1 unapplied entry left in the log // It removes the entries which have made it to snapshot // and updates the snapshot index and term - Map state = takeSnapshot(3); + takeSnapshot(3); // check the values after the snapshot. // each index value passed in the test is the logical index (log entry index) @@ -101,7 +92,7 @@ public class AbstractReplicatedLogImplTest { assertEquals(2, replicatedLogImpl.getFrom(6).size()); // take a second snapshot with 5 entries with 0 unapplied entries left in the log - state = takeSnapshot(5); + takeSnapshot(5); assertEquals(0, replicatedLogImpl.size()); assertNull(replicatedLogImpl.last()); @@ -187,19 +178,45 @@ public class AbstractReplicatedLogImplTest { assertTrue(replicatedLogImpl.isPresent(5)); } + @Test + public void testRemoveFrom() { + + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3))); + + assertEquals("dataSize", 9, replicatedLogImpl.dataSize()); + + long adjusted = replicatedLogImpl.removeFrom(4); + assertEquals("removeFrom - adjusted", 4, adjusted); + assertEquals("size", 4, replicatedLogImpl.size()); + assertEquals("dataSize", 4, replicatedLogImpl.dataSize()); + + takeSnapshot(1); + + adjusted = replicatedLogImpl.removeFrom(2); + assertEquals("removeFrom - adjusted", 1, adjusted); + assertEquals("size", 1, replicatedLogImpl.size()); + assertEquals("dataSize", 1, replicatedLogImpl.dataSize()); + + assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0)); + assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100)); + } + // create a snapshot for test public Map takeSnapshot(final int numEntries) { Map map = new HashMap<>(numEntries); - List entries = replicatedLogImpl.getEntriesTill(numEntries); - for (ReplicatedLogEntry entry : entries) { + + long lastIndex = 0; + long lastTerm = 0; + for(int i = 0; i < numEntries; i++) { + ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i); map.put(entry.getIndex(), entry.getData().toString()); + lastIndex = entry.getIndex(); + lastTerm = entry.getTerm(); } - int term = (int) replicatedLogImpl.lastTerm(); - int lastIndex = (int) entries.get(entries.size() - 1).getIndex(); - entries.clear(); - replicatedLogImpl.setSnapshotTerm(term); - replicatedLogImpl.setSnapshotIndex(lastIndex); + replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm); + replicatedLogImpl.snapshotCommit(); return map; @@ -213,15 +230,6 @@ public class AbstractReplicatedLogImplTest { public void removeFromAndPersist(final long index) { } - @Override - public int dataSize() { - return -1; - } - - public List getEntriesTill(final int index) { - return journal.subList(0, index); - } - @Override public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 17a81ac3c3..14bfd1d348 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -97,9 +98,11 @@ public class RaftActorTest extends AbstractActorTest { InMemorySnapshotStore.clear(); } - public static class MockRaftActor extends RaftActor { + public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - private final RaftActor delegate; + private final RaftActor actorDelegate; + private final RaftActorRecoveryCohort recoveryCohortDelegate; + private final RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -136,7 +139,9 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); - this.delegate = mock(RaftActor.class); + this.actorDelegate = mock(RaftActor.class); + this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); + this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); if(dataPersistenceProvider == null){ setPersistence(true); } else { @@ -197,26 +202,37 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { - delegate.applyState(clientActor, identifier, data); + actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; } @Override - protected void appendRecoveredLogEntry(Payload data) { + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; + } + + @Override + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { state.add(data); } @Override - protected void applyCurrentLogRecoveryBatch() { + public void applyCurrentLogRecoveryBatch() { } @Override protected void onRecoveryComplete() { - delegate.onRecoveryComplete(); + actorDelegate.onRecoveryComplete(); recoveryComplete.countDown(); } @@ -227,8 +243,8 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(byte[] bytes) { - delegate.applyRecoverySnapshot(bytes); + public void applyRecoverySnapshot(byte[] bytes) { + recoveryCohortDelegate.applyRecoverySnapshot(bytes); try { Object data = toObject(bytes); if (data instanceof List) { @@ -239,18 +255,21 @@ public class RaftActorTest extends AbstractActorTest { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { LOG.info("{}: createSnapshot called", persistenceId()); - delegate.createSnapshot(); + snapshotCohortDelegate.createSnapshot(actorRef); } - @Override protected void applySnapshot(byte [] snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - delegate.applySnapshot(snapshot); + snapshotCohortDelegate.applySnapshot(snapshot); } - @Override protected void onStateChanged() { - delegate.onStateChanged(); + @Override + protected void onStateChanged() { + actorDelegate.onStateChanged(); } @Override @@ -284,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest { public ReplicatedLog getReplicatedLog(){ return this.getRaftActorContext().getReplicatedLog(); } - } @@ -399,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest { // add more entries after snapshot is taken List entries = new ArrayList<>(); ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F")); + new MockRaftActorContext.MockPayload("F", 2)); ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("G")); + new MockRaftActorContext.MockPayload("G", 3)); ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("H")); + new MockRaftActorContext.MockPayload("H", 4)); entries.add(entry2); entries.add(entry3); entries.add(entry4); @@ -433,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext context = ref.underlyingActor().getRaftActorContext(); assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size()); + assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize()); assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); @@ -516,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); + verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -583,7 +602,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); + verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -810,7 +829,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); - verify(mockRaftActor.delegate).createSnapshot(); + verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -860,7 +879,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); - verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); } }; @@ -907,7 +926,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); + verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -1131,7 +1150,7 @@ public class RaftActorTest extends AbstractActorTest { .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); - verify(leaderActor.delegate).createSnapshot(); + verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1230,7 +1249,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("D")), 4); - verify(followerActor.delegate).createSnapshot(); + verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(6, followerActor.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java index d94e1c691e..81605d8c8f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java @@ -7,40 +7,17 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import scala.concurrent.Future; abstract class AbstractTransactionContext implements TransactionContext { - private final List> recordedOperationFutures = new ArrayList<>(); private final TransactionIdentifier identifier; protected AbstractTransactionContext(TransactionIdentifier identifier) { this.identifier = identifier; } - @Override - public final void copyRecordedOperationFutures(Collection> target) { - target.addAll(recordedOperationFutures); - } - protected final TransactionIdentifier getIdentifier() { return identifier; } - - protected final Collection> copyRecordedOperationFutures() { - return ImmutableList.copyOf(recordedOperationFutures); - } - - protected final int recordedOperationCount() { - return recordedOperationFutures.size(); - } - - protected final void recordOperationFuture(Future future) { - recordedOperationFutures.add(future); - } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9cd52b219a..81449c5747 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; @@ -59,22 +58,18 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; -import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -87,8 +82,6 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @VisibleForTesting @@ -104,10 +97,6 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; - private SchemaContext schemaContext; - - private int createSnapshotTransactionCounter; - private final ShardCommitCoordinator commitCoordinator; private long transactionCommitTimeout; @@ -121,15 +110,11 @@ public class Shard extends RaftActor { private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( Serialization.serializedActorPath(getSelf())); + private final DOMTransactionFactory domTransactionFactory; - /** - * Coordinates persistence recovery on startup. - */ - private ShardRecoveryCoordinator recoveryCoordinator; + private final ShardTransactionActorFactory transactionActorFactory; - private final DOMTransactionFactory transactionFactory; - - private final String txnDispatcherPath; + private final ShardSnapshotCohort snapshotCohort; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); @@ -140,9 +125,6 @@ public class Shard extends RaftActor { this.name = name.toString(); this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; - this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) - .getDispatcherPath(Dispatchers.DispatcherType.Transaction); setPersistence(datastoreContext.isPersistent()); @@ -164,9 +146,9 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); + domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); - commitCoordinator = new ShardCommitCoordinator(transactionFactory, + commitCoordinator = new ShardCommitCoordinator(domTransactionFactory, TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); @@ -178,7 +160,11 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); - recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); + transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext, + new Dispatchers(context().system().dispatchers()).getDispatcherPath( + Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); + + snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name); } private void setTransactionCommitTimeout() { @@ -447,8 +433,12 @@ public class Shard extends RaftActor { // if(isLeader()) { try { - BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched); - sender().tell(reply, self()); + boolean ready = commitCoordinator.handleTransactionModifications(batched); + if(ready) { + sender().tell(READY_TRANSACTION_REPLY, self()); + } else { + sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self()); + } } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -488,20 +478,21 @@ public class Shard extends RaftActor { // node. In that case, the subsequent 3-phase commit messages won't contain the // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); - ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); + if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { + ActorRef replyActorPath = getSelf(); + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { + LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath)); + new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), + ready.getTxnClientVersion()); getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, getSelf()); - + readyTransactionReply, getSelf()); } else { - - getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() : - READY_TRANSACTION_REPLY, getSelf()); + getSender().tell(READY_TRANSACTION_REPLY, getSelf()); } } @@ -558,29 +549,15 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); + domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); } private ActorRef createTypedTransactionActor(int transactionType, ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - DOMStoreTransaction transaction = transactionFactory.newTransaction( - TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(), - transactionChainId); - - return createShardTransaction(transaction, transactionId, clientVersion); - } - - private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, - short clientVersion){ - return getContext().actorOf( - ShardTransaction.props(transaction, getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion) - .withDispatcher(txnDispatcherPath), - transactionId.toString()); - + return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType), + transactionId, transactionChainId, clientVersion); } private void createTransaction(CreateTransaction createTransaction) { @@ -612,18 +589,11 @@ public class Shard extends RaftActor { return transactionActor; } - private void syncCommitTransaction(final DOMStoreWriteTransaction transaction) - throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } - private void commitWithNewTransaction(final Modification modification) { DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); modification.apply(tx); try { - syncCommitTransaction(tx); + snapshotCohort.syncCommitTransaction(tx); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { @@ -633,9 +603,7 @@ public class Shard extends RaftActor { } private void updateSchemaContext(final UpdateSchemaContext message) { - this.schemaContext = message.getSchemaContext(); updateSchemaContext(message.getSchemaContext()); - store.onGlobalContextUpdated(message.getSchemaContext()); } @VisibleForTesting @@ -649,30 +617,18 @@ public class Shard extends RaftActor { } @Override - protected - void startLogRecoveryBatch(final int maxBatchSize) { - recoveryCoordinator.startLogRecoveryBatch(maxBatchSize); + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return snapshotCohort; } @Override - protected void appendRecoveredLogEntry(final Payload data) { - recoveryCoordinator.appendRecoveredLogPayload(data); - } - - @Override - protected void applyRecoverySnapshot(final byte[] snapshotBytes) { - recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes); - } - - @Override - protected void applyCurrentLogRecoveryBatch() { - recoveryCoordinator.applyCurrentLogRecoveryBatch(); + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return new ShardRecoveryCoordinator(store, persistenceId(), LOG); } @Override protected void onRecoveryComplete() { - recoveryCoordinator = null; - //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); @@ -727,46 +683,6 @@ public class Shard extends RaftActor { } } - @Override - protected void createSnapshot() { - // Create a transaction actor. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot. THe transaction actor will - // after processing the CreateSnapshot message. - - ActorRef createSnapshotTransaction = createTransaction( - TransactionProxy.TransactionType.READ_ONLY.ordinal(), - "createSnapshot" + ++createSnapshotTransactionCounter, "", - DataStoreVersions.CURRENT_VERSION); - - createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self()); - } - - @VisibleForTesting - @Override - protected void applySnapshot(final byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - LOG.info("{}: Applying snapshot", persistenceId()); - try { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - // delete everything first - transaction.delete(DATASTORE_ROOT); - - // Add everything from the remote node back - transaction.write(DATASTORE_ROOT, node); - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { - LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e); - } finally { - LOG.info("{}: Done applying snapshot", persistenceId()); - } - } - @Override protected void onStateChanged() { boolean isLeader = isLeader(); @@ -781,7 +697,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - transactionFactory.closeAllTransactionChains(); + domTransactionFactory.closeAllTransactionChains(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 54f15fcb4b..b96e38d76a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -22,7 +22,6 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -119,7 +118,7 @@ public class ShardCommitCoordinator { * * @throws ExecutionException if an error occurs loading the cache */ - public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched) + public boolean handleTransactionModifications(BatchedModifications batched) throws ExecutionException { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { @@ -137,7 +136,6 @@ public class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); - String cohortPath = null; if(batched.isReady()) { if(log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, @@ -145,10 +143,9 @@ public class ShardCommitCoordinator { } cohortEntry.ready(cohortDecorator); - cohortPath = shardActorPath; } - return new BatchedModificationsReply(batched.getModifications().size(), cohortPath); + return batched.isReady(); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 2e66ef918e..41ca486eb6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -38,9 +37,8 @@ public class ShardReadTransaction extends ShardTransaction { private final DOMStoreReadTransaction transaction; public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index b394da88e8..2042e95577 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -25,9 +24,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction { private final DOMStoreReadWriteTransaction transaction; public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(transaction, shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 7e547d7257..01a124b697 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -13,6 +13,7 @@ import java.util.List; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -32,7 +33,7 @@ import org.slf4j.Logger; * * @author Thomas Panetelis */ -class ShardRecoveryCoordinator { +class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private final InMemoryDOMDataStore store; private List currentLogRecoveryBatch; @@ -45,13 +46,15 @@ class ShardRecoveryCoordinator { this.log = log; } - void startLogRecoveryBatch(int maxBatchSize) { + @Override + public void startLogRecoveryBatch(int maxBatchSize) { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); } - void appendRecoveredLogPayload(Payload payload) { + @Override + public void appendRecoveredLogEntry(Payload payload) { try { if(payload instanceof ModificationPayload) { currentLogRecoveryBatch.add((ModificationPayload) payload); @@ -83,7 +86,8 @@ class ShardRecoveryCoordinator { /** * Applies the current batched log entries to the data store. */ - void applyCurrentLogRecoveryBatch() { + @Override + public void applyCurrentLogRecoveryBatch() { log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); @@ -105,7 +109,8 @@ class ShardRecoveryCoordinator { * * @param snapshotBytes the serialized snapshot */ - void applyRecoveredSnapshot(final byte[] snapshotBytes) { + @Override + public void applyRecoverySnapshot(final byte[] snapshotBytes) { log.debug("{}: Applyng recovered sbapshot", shardName); DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java new file mode 100644 index 0000000000..c59085d61c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import java.util.concurrent.ExecutionException; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; + +/** + * Participates in raft snapshotting on behalf of a Shard actor. + * + * @author Thomas Pantelis + */ +class ShardSnapshotCohort implements RaftActorSnapshotCohort { + + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); + + private int createSnapshotTransactionCounter; + private final ShardTransactionActorFactory transactionActorFactory; + private final InMemoryDOMDataStore store; + private final Logger log; + private final String logId; + + ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store, + Logger log, String logId) { + this.transactionActorFactory = transactionActorFactory; + this.store = store; + this.log = log; + this.logId = logId; + } + + @Override + public void createSnapshot(ActorRef actorRef) { + // Create a transaction actor. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot. THe transaction actor will + // after processing the CreateSnapshot message. + + ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier( + "createSnapshot" + ++createSnapshotTransactionCounter); + + ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction( + TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION); + + createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef); + } + + @Override + public void applySnapshot(byte[] snapshotBytes) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + + log.info("{}: Applying snapshot", logId); + + try { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + + // delete everything first + transaction.delete(DATASTORE_ROOT); + + // Add everything from the remote node back + transaction.write(DATASTORE_ROOT, node); + syncCommitTransaction(transaction); + } catch (InterruptedException | ExecutionException e) { + log.error("{}: An exception occurred when applying snapshot", logId, e); + } finally { + log.info("{}: Done applying snapshot", logId); + } + + } + + void syncCommitTransaction(final DOMStoreWriteTransaction transaction) + throws ExecutionException, InterruptedException { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 613b3749e0..066f01b092 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * The ShardTransaction Actor represents a remote transaction @@ -54,25 +53,22 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering protected static final boolean SERIALIZED_REPLY = true; private final ActorRef shardActor; - private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; private final short clientTxVersion; - protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - ShardStats shardStats, String transactionID, short clientTxVersion) { + protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID, + short clientTxVersion) { super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; - this.schemaContext = schemaContext; this.shardStats = shardStats; this.transactionID = transactionID; this.clientTxVersion = clientTxVersion; } public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats, - String transactionID, short txnClientVersion) { - return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, + DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { + return Props.create(new ShardTransactionCreator(transaction, shardActor, datastoreContext, shardStats, transactionID, txnClientVersion)); } @@ -86,10 +82,6 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return transactionID; } - protected SchemaContext getSchemaContext() { - return schemaContext; - } - protected short getClientTxVersion() { return clientTxVersion; } @@ -161,19 +153,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering final DOMStoreTransaction transaction; final ActorRef shardActor; - final SchemaContext schemaContext; final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; final short txnClientVersion; ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, DatastoreContext datastoreContext, - ShardStats shardStats, String transactionID, short txnClientVersion) { + DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { this.transaction = transaction; this.shardActor = shardActor; this.shardStats = shardStats; - this.schemaContext = schemaContext; this.datastoreContext = datastoreContext; this.transactionID = transactionID; this.txnClientVersion = txnClientVersion; @@ -184,13 +173,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering ShardTransaction tx; if(transaction instanceof DOMStoreReadWriteTransaction) { tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + shardActor, shardStats, transactionID, txnClientVersion); } else if(transaction instanceof DOMStoreReadTransaction) { tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - schemaContext, shardStats, transactionID, txnClientVersion); + shardStats, transactionID, txnClientVersion); } else { tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + shardActor, shardStats, transactionID, txnClientVersion); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 8ba613958a..a4c97e8ab9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -27,14 +27,12 @@ public class ShardTransactionChain extends AbstractUntypedActor { private final DOMStoreTransactionChain chain; private final DatastoreContext datastoreContext; - private final SchemaContext schemaContext; private final ShardStats shardStats; - public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext, - DatastoreContext datastoreContext, ShardStats shardStats) { + public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + ShardStats shardStats) { this.chain = chain; this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; this.shardStats = shardStats; } @@ -61,22 +59,19 @@ public class ShardTransactionChain extends AbstractUntypedActor { TransactionProxy.TransactionType.READ_ONLY.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else { throw new IllegalArgumentException ( @@ -94,8 +89,7 @@ public class ShardTransactionChain extends AbstractUntypedActor { public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext, DatastoreContext datastoreContext, ShardStats shardStats) { - return Props.create(new ShardTransactionChainCreator(chain, schemaContext, - datastoreContext, shardStats)); + return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats)); } private static class ShardTransactionChainCreator implements Creator { @@ -103,21 +97,19 @@ public class ShardTransactionChain extends AbstractUntypedActor { final DOMStoreTransactionChain chain; final DatastoreContext datastoreContext; - final SchemaContext schemaContext; final ShardStats shardStats; - ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext, - DatastoreContext datastoreContext, ShardStats shardStats) { + ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + ShardStats shardStats) { this.chain = chain; this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; this.shardStats = shardStats; } @Override public ShardTransactionChain create() throws Exception { - return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats); + return new ShardTransactionChain(chain, datastoreContext, shardStats); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java new file mode 100644 index 0000000000..9637646fc5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; + +/** + * A factory for creating ShardTransaction actors. + * + * @author Thomas Pantelis + */ +class ShardTransactionActorFactory { + + private final DOMTransactionFactory domTransactionFactory; + private final DatastoreContext datastoreContext; + private final String txnDispatcherPath; + private final ShardStats shardMBean; + private final UntypedActorContext actorContext; + private final ActorRef shardActor; + + ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext, + String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) { + this.domTransactionFactory = domTransactionFactory; + this.datastoreContext = datastoreContext; + this.txnDispatcherPath = txnDispatcherPath; + this.shardMBean = shardMBean; + this.actorContext = actorContext; + this.shardActor = shardActor; + } + + ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID, + String transactionChainID, short clientVersion) { + + DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(), + transactionChainID); + + return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean, + transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath), + transactionID.toString()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index d5dcfde803..1d5b1d8e1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -1,6 +1,6 @@ /* - * * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -41,12 +40,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardWriteTransaction extends ShardTransaction { private final MutableCompositeModification compositeModification = new MutableCompositeModification(); + private int totalBatchedModificationsReceived; + private Exception lastBatchedModificationsException; private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } @@ -88,9 +88,29 @@ public class ShardWriteTransaction extends ShardTransaction { modification.apply(transaction); } - getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + totalBatchedModificationsReceived++; + if(batched.isReady()) { + if(lastBatchedModificationsException != null) { + throw lastBatchedModificationsException; + } + + if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) { + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + totalBatchedModificationsReceived, batched.getTotalMessagesSent())); + } + + readyTransaction(transaction, false); + } else { + getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + } } catch (Exception e) { + lastBatchedModificationsException = e; getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + + if(batched.isReady()) { + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index a5a7494e1a..bc6e5f229f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; -import java.util.Collection; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import scala.concurrent.Future; @@ -33,6 +32,4 @@ interface TransactionContext { void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture); void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture); - - void copyRecordedOperationFutures(Collection> target); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c61682d8ef..c722918c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -11,18 +12,14 @@ import akka.actor.ActorSelection; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; -import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; @@ -49,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; + private int totalBatchedModificationsSent; protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, @@ -93,90 +91,56 @@ public class TransactionContextImpl extends AbstractTransactionContext { @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", - getIdentifier(), recordedOperationCount()); + LOG.debug("Tx {} readyTransaction called", getIdentifier()); - // Send the remaining batched modifications if any. + // Send the remaining batched modifications, if any, with the ready flag set. - sendAndRecordBatchedModifications(); + Future lastModificationsFuture = sendBatchedModifications(true); - // Send the ReadyTransaction message to the Tx actor. - - Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); - - return combineRecordedOperationsFutures(readyReplyFuture); + return transformReadyReply(lastModificationsFuture); } - protected Future combineRecordedOperationsFutures(final Future withLastReplyFuture) { - // Combine all the previously recorded put/merge/delete operation reply Futures and the - // ReadyTransactionReply Future into one Future. If any one fails then the combined - // Future will fail. We need all prior operations and the ready operation to succeed - // in order to attempt commit. - - List> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1); - copyRecordedOperationFutures(futureList); - futureList.add(withLastReplyFuture); - - Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, - actorContext.getClientDispatcher()); - - // Transform the combined Future into a Future that returns the cohort actor path from - // the ReadyTransactionReply. That's the end result of the ready operation. + protected Future transformReadyReply(final Future readyReplyFuture) { + // Transform the last reply Future into a Future that returns the cohort actor path from + // the last reply message. That's the end result of the ready operation. - return combinedFutures.transform(new Mapper, ActorSelection>() { + return readyReplyFuture.transform(new Mapper() { @Override - public ActorSelection checkedApply(Iterable notUsed) { - LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", - getIdentifier()); - - // At this point all the Futures succeeded and we need to extract the cohort - // actor path from the ReadyTransactionReply. For the recorded operations, they - // don't return any data so we're only interested that they completed - // successfully. We could be paranoid and verify the correct reply types but - // that really should never happen so it's not worth the overhead of - // de-serializing each reply. - - // Note the Future get call here won't block as it's complete. - Object serializedReadyReply = withLastReplyFuture.value().get().get(); - if (serializedReadyReply instanceof ReadyTransactionReply) { - return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply instanceof BatchedModificationsReply) { - return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = deserializeCohortPath(reply.getCohortPath()); - return actorContext.actorSelection(cohortPath); - } else { - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); + public ActorSelection checkedApply(Object serializedReadyReply) { + LOG.debug("Tx {} readyTransaction", getIdentifier()); + + // At this point the ready operation succeeded and we need to extract the cohort + // actor path from the reply. + if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { + ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); } + + // Throwing an exception here will fail the Future. + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + getIdentifier(), serializedReadyReply.getClass())); } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } - protected String deserializeCohortPath(String cohortPath) { - return cohortPath; + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { + return readyTxReply.getCohortPath(); + } + + private BatchedModifications newBatchedModifications() { + return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId); } private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + batchedModifications = newBatchedModifications(); } batchedModifications.addModification(modification); if(batchedModifications.getModifications().size() >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { - sendAndRecordBatchedModifications(); - } - } - - private void sendAndRecordBatchedModifications() { - Future sentFuture = sendBatchedModifications(); - if(sentFuture != null) { - recordOperationFuture(sentFuture); + sendBatchedModifications(); } } @@ -186,17 +150,25 @@ public class TransactionContextImpl extends AbstractTransactionContext { protected Future sendBatchedModifications(boolean ready) { Future sent = null; - if(batchedModifications != null) { + if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { + if(batchedModifications == null) { + batchedModifications = newBatchedModifications(); + } + if(LOG.isDebugEnabled()) { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); } batchedModifications.setReady(ready); + batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); sent = executeOperationAsync(batchedModifications); - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + if(ready) { + batchedModifications = null; + } else { + batchedModifications = newBatchedModifications(); + } } return sent; @@ -232,7 +204,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - sendAndRecordBatchedModifications(); + sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override @@ -274,7 +246,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - sendAndRecordBatchedModifications(); + sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 59c9298499..388dd9f4bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -16,12 +16,16 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -153,19 +158,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if (transactionContext != null) { - transactionContext.copyRecordedOperationFutures(recordedOperationFutures); - } - } - - return recordedOperationFutures; - } - @VisibleForTesting boolean hasTransactionContext() { for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { @@ -178,6 +170,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>, ReadFailedException> read(final YangInstanceIdentifier path) { @@ -186,21 +182,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> proxyFuture = SettableFuture.create(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); + if(isRootPath(path)){ + readAllData(path, proxyFuture); + } else { + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); + } + }); + + } return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } + private void readAllData(final YangInstanceIdentifier path, + final SettableFuture>> proxyFuture) { + Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + List>>> futures = new ArrayList<>(allShardNames.size()); + + for(String shardName : allShardNames){ + final SettableFuture>> subProxyFuture = SettableFuture.create(); + + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, subProxyFuture); + } + }); + + futures.add(subProxyFuture); + } + + final ListenableFuture>>> future = Futures.allAsList(futures); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), + future.get(), actorContext.getSchemaContext())); + } catch (InterruptedException | ExecutionException e) { + proxyFuture.setException(e); + } + } + }, actorContext.getActorSystem().dispatcher()); + } + @Override public CheckedFuture exists(final YangInstanceIdentifier path) { @@ -409,6 +446,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); @@ -685,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", - getIdentifier(), recordedOperationCount()); - - // Send the remaining batched modifications if any. - - Future lastModificationsFuture = sendBatchedModifications(true); - - return combineRecordedOperationsFutures(lastModificationsFuture); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index c3450333a4..d17497c18c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -45,36 +46,32 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { @Override public void deleteData(YangInstanceIdentifier path) { - recordOperationFuture(executeOperationAsync( - new DeleteData(path, getRemoteTransactionVersion()))); + executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion())); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - recordOperationFuture(executeOperationAsync( - new MergeData(path, data, getRemoteTransactionVersion()))); + executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion())); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - recordOperationFuture(executeOperationAsync( - new WriteData(path, data, getRemoteTransactionVersion()))); + executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion())); } @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", - getIdentifier(), recordedOperationCount()); + LOG.debug("Tx {} readyTransaction called", getIdentifier()); // Send the ReadyTransaction message to the Tx actor. Future lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); - return combineRecordedOperationsFutures(lastReplyFuture); + return transformReadyReply(lastReplyFuture); } @Override - protected String deserializeCohortPath(String cohortPath) { + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { // In base Helium we used to return the local path of the actor which represented // a remote ThreePhaseCommitCohort. The local path would then be converted to // a remote path using this resolvePath method. To maintain compatibility with @@ -83,9 +80,9 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { // we could remove this code to resolvePath and just use the cohortPath as the // resolved cohortPath if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { - return getActorContext().resolvePath(transactionPath, cohortPath); + return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath()); } - return cohortPath; + return readyTxReply.getCohortPath(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index a9ce94b033..86f96f57d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement private static final long serialVersionUID = 1L; private boolean ready; + private int totalMessagesSent; private String transactionID; private String transactionChainID; @@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement this.ready = ready; } + public int getTotalMessagesSent() { + return totalMessagesSent; + } + + public void setTotalMessagesSent(int totalMessagesSent) { + this.totalMessagesSent = totalMessagesSent; + } + public String getTransactionID() { return transactionID; } @@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement transactionID = in.readUTF(); transactionChainID = in.readUTF(); ready = in.readBoolean(); + totalMessagesSent = in.readInt(); } @Override @@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement out.writeUTF(transactionID); out.writeUTF(transactionChainID); out.writeBoolean(ready); + out.writeInt(totalMessagesSent); } @Override @@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready) - .append(", modifications size=").append(getModifications().size()).append("]"); + builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=") + .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=") + .append(totalMessagesSent).append(", modifications size=").append(getModifications().size()) + .append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java index a10c6ac3fb..895de3a626 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java @@ -19,11 +19,7 @@ import java.io.ObjectOutput; public class BatchedModificationsReply extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; - private static final byte COHORT_PATH_NOT_PRESENT = 0; - private static final byte COHORT_PATH_PRESENT = 1; - private int numBatched; - private String cohortPath; public BatchedModificationsReply() { } @@ -32,40 +28,20 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { this.numBatched = numBatched; } - public BatchedModificationsReply(int numBatched, String cohortPath) { - this.numBatched = numBatched; - this.cohortPath = cohortPath; - } - public int getNumBatched() { return numBatched; } - public String getCohortPath() { - return cohortPath; - } - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); numBatched = in.readInt(); - - if(in.readByte() == COHORT_PATH_PRESENT) { - cohortPath = in.readUTF(); - } } @Override public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); out.writeInt(numBatched); - - if(cohortPath != null) { - out.writeByte(COHORT_PATH_PRESENT); - out.writeUTF(cohortPath); - } else { - out.writeByte(COHORT_PATH_NOT_PRESENT); - } } @Override @@ -76,8 +52,7 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=") - .append(cohortPath).append("]"); + builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 38886c9a58..0f87243059 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -20,9 +20,9 @@ public class ForwardedReadyTransaction { private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; private final boolean returnSerialized; - private final int txnClientVersion; + private final short txnClientVersion; - public ForwardedReadyTransaction(String transactionID, int txnClientVersion, + public ForwardedReadyTransaction(String transactionID, short txnClientVersion, DOMStoreThreePhaseCommitCohort cohort, Modification modification, boolean returnSerialized) { this.transactionID = transactionID; @@ -48,7 +48,7 @@ public class ForwardedReadyTransaction { return returnSerialized; } - public int getTxnClientVersion() { + public short getTxnClientVersion() { return txnClientVersion; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java index 09617abde9..8d617d0ba7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +@Deprecated public class ReadyTransaction implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransaction.class; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java index 282e23ed3b..b25a5ddf29 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java @@ -8,15 +8,29 @@ package org.opendaylight.controller.cluster.datastore.messages; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -public class ReadyTransactionReply implements SerializableMessage { +public class ReadyTransactionReply extends VersionedExternalizableMessage { + private static final long serialVersionUID = 1L; + public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class; - private final String cohortPath; + private String cohortPath; + + public ReadyTransactionReply() { + } public ReadyTransactionReply(String cohortPath) { + this(cohortPath, DataStoreVersions.CURRENT_VERSION); + } + + public ReadyTransactionReply(String cohortPath, short version) { + super(version); this.cohortPath = cohortPath; } @@ -25,16 +39,38 @@ public class ReadyTransactionReply implements SerializableMessage { } @Override - public ShardTransactionMessages.ReadyTransactionReply toSerializable() { - return ShardTransactionMessages.ReadyTransactionReply.newBuilder() - .setActorPath(cohortPath) - .build(); + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + cohortPath = in.readUTF(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + out.writeUTF(cohortPath); + } + + @Override + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { + return this; + } else { + return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build(); + } } public static ReadyTransactionReply fromSerializable(Object serializable) { - ShardTransactionMessages.ReadyTransactionReply o = - (ShardTransactionMessages.ReadyTransactionReply) serializable; + if(serializable instanceof ReadyTransactionReply) { + return (ReadyTransactionReply)serializable; + } else { + ShardTransactionMessages.ReadyTransactionReply o = + (ShardTransactionMessages.ReadyTransactionReply) serializable; + return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION); + } + } - return new ReadyTransactionReply(o.getActorPath()); + public static boolean isSerializedType(Object message) { + return message instanceof ReadyTransactionReply || + message instanceof ShardTransactionMessages.ReadyTransactionReply; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index f53368d886..17d988005f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -540,6 +540,10 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + public Configuration getConfiguration() { + return configuration; + } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ return ask(actorRef, message, timeout); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java new file mode 100644 index 0000000000..eb1307897a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class NormalizedNodeAggregator { + + private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService(); + + private final YangInstanceIdentifier rootIdentifier; + private final List>> nodes; + private final InMemoryDOMDataStore dataStore; + + NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List>> nodes, + SchemaContext schemaContext){ + + this.rootIdentifier = rootIdentifier; + this.nodes = nodes; + this.dataStore = new InMemoryDOMDataStore("aggregator", executorService); + this.dataStore.onGlobalContextUpdated(schemaContext); + } + + /** + * Combine data from all the nodes in the list into a tree with root as rootIdentifier + * + * @param nodes + * @param schemaContext + * @return + * @throws ExecutionException + * @throws InterruptedException + */ + public static Optional> aggregate(YangInstanceIdentifier rootIdentifier, + List>> nodes, + SchemaContext schemaContext) + throws ExecutionException, InterruptedException { + return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate(); + } + + private Optional> aggregate() throws ExecutionException, InterruptedException { + return combine().getRootNode(); + } + + private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException { + DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction(); + + for(Optional> node : nodes) { + if(node.isPresent()) { + domStoreWriteTransaction.merge(rootIdentifier, node.get()); + } + } + DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready(); + ready.canCommit().get(); + ready.preCommit().get(); + ready.commit().get(); + + return this; + } + + private Optional> getRootNode() throws InterruptedException, ExecutionException { + DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> read = + readTransaction.read(rootIdentifier); + + return read.get(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index c6c5486ee3..6a1e12a96b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -204,10 +203,6 @@ public abstract class AbstractTransactionProxyTest { return argThat(matcher); } - protected Future readySerializedTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); - } - protected Future readyTxReply(String path) { return Futures.successful((Object)new ReadyTransactionReply(path)); } @@ -250,10 +245,8 @@ public abstract class AbstractTransactionProxyTest { eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } - protected void expectBatchedModificationsReady(ActorRef actorRef, int count) { - Future replyFuture = Futures.successful( - new BatchedModificationsReply(count, actorRef.path().toString())); - doReturn(replyFuture).when(mockActorContext).executeOperationAsync( + protected void expectBatchedModificationsReady(ActorRef actorRef) { + doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } @@ -267,11 +260,6 @@ public abstract class AbstractTransactionProxyTest { any(ActorSelection.class), isA(BatchedModifications.class)); } - protected void expectReadyTransaction(ActorRef actorRef) { - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - } - protected void expectFailedBatchedModifications(ActorRef actorRef) { doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(BatchedModifications.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e04c1a5d18..e3b82df174 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.pattern.Patterns; +import akka.persistence.SaveSnapshotSuccess; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -436,42 +437,42 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final String transactionID1 = "tx1"; - final String transactionID2 = "tx2"; - final String transactionID3 = "tx3"; + // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - final AtomicReference mockCohort1 = new AtomicReference<>(); - final AtomicReference mockCohort2 = new AtomicReference<>(); - final AtomicReference mockCohort3 = new AtomicReference<>(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) { - if(transactionID.equals(transactionID1)) { - mockCohort1.set(createDelegatingMockCohort("cohort1", actual)); - return mockCohort1.get(); - } else if(transactionID.equals(transactionID2)) { - mockCohort2.set(createDelegatingMockCohort("cohort2", actual)); - return mockCohort2.get(); - } else { - mockCohort3.set(createDelegatingMockCohort("cohort3", actual)); - return mockCohort3.get(); - } - } - }; + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification3); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Send a BatchedModifications message for the first transaction. + // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class); - assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath()); - assertEquals("getNumBatched", 1, batchedReply.getNumBatched()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( + expectMsgClass(duration, ReadyTransactionReply.class)); + assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); // Send the CanCommitTransaction message for the first Tx. @@ -480,16 +481,15 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send BatchedModifications for the next 2 Tx's. + // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. @@ -582,16 +582,16 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get()); - inOrder.verify(mockCohort1.get()).canCommit(); - inOrder.verify(mockCohort1.get()).preCommit(); - inOrder.verify(mockCohort1.get()).commit(); - inOrder.verify(mockCohort2.get()).canCommit(); - inOrder.verify(mockCohort2.get()).preCommit(); - inOrder.verify(mockCohort2.get()).commit(); - inOrder.verify(mockCohort3.get()).canCommit(); - inOrder.verify(mockCohort3.get()).preCommit(); - inOrder.verify(mockCohort3.get()).commit(); + InOrder inOrder = inOrder(cohort1, cohort2, cohort3); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort1).commit(); + inOrder.verify(cohort2).canCommit(); + inOrder.verify(cohort2).preCommit(); + inOrder.verify(cohort2).commit(); + inOrder.verify(cohort3).canCommit(); + inOrder.verify(cohort3).preCommit(); + inOrder.verify(cohort3).commit(); // Verify data in the data store. @@ -669,7 +669,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -728,7 +728,7 @@ public class ShardTest extends AbstractShardTest { YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. @@ -810,14 +810,24 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + + // Setup a simulated transactions with a mock cohort. + String transactionID = "tx"; + MutableCompositeModification modification = new MutableCompositeModification(); + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + TestModel.TEST_PATH, containerNode, modification); + FiniteDuration duration = duration("5 seconds"); - // Send a BatchedModifications to start a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -831,6 +841,11 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -864,7 +879,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -919,7 +934,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -958,40 +973,34 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 mock cohorts. The first one fails in the commit phase. + // Setup 2 simulated transactions with mock cohorts. The first one fails in the + // commit phase. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Send BatchedModifications to start and ready each transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1044,27 +1053,19 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); - // Send BatchedModifications to start and ready a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1099,24 +1100,16 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - - // Send BatchedModifications to start and ready a transaction. - - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1160,9 +1153,14 @@ public class ShardTest extends AbstractShardTest { } }; - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), + modification, preCommit); + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( @@ -1196,26 +1194,42 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create and ready the 1st Tx - will timeout + // Create 1st Tx - will timeout String transactionID1 = "tx1"; - shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification1); - // Create and ready the 2nd Tx + // Create 2nd Tx - String transactionID2 = "tx2"; + String transactionID2 = "tx3"; + MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - shard.tell(newBatchedModifications(transactionID2, listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), + modification2); + + // Ready the Tx's + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1252,23 +1266,38 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - // Send a BatchedModifications to start transactions and ready them. + // Ready the Tx's - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. @@ -1313,37 +1342,30 @@ public class ShardTest extends AbstractShardTest { // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - - // Send BatchedModifications to start and ready each transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1389,6 +1411,8 @@ public class ShardTest extends AbstractShardTest { @SuppressWarnings("serial") public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ + final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + final AtomicReference savedSnapshot = new AtomicReference<>(); class TestPersistentDataProvider extends DelegatingPersistentDataProvider { TestPersistentDataProvider(DataPersistenceProvider delegate) { @@ -1405,8 +1429,6 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); new ShardTestKit(getSystem()) {{ - final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - class TestShard extends Shard { protected TestShard(ShardIdentifier name, Map peerAddresses, @@ -1416,9 +1438,12 @@ public class ShardTest extends AbstractShardTest { } @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); + public void handleCommand(Object message) { + super.handleCommand(message); + + if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) { + latch.get().countDown(); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index c3fef611e3..21fb55fcf1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -70,8 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -100,8 +99,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -130,8 +128,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -160,8 +157,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -193,8 +189,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -231,8 +226,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -264,8 +258,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index e63ace3e2c..c9335f378a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; import akka.actor.ActorRef; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; @@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -97,7 +100,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name, short version) { Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(), - testSchemaContext, datastoreContext, shardStats, "txn", version); + datastoreContext, shardStats, "txn", version); return getSystem().actorOf(props, name); } @@ -409,34 +412,125 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveReadyTransaction() throws Exception { + public void testOnReceiveBatchedModificationsReady() throws Exception { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReady"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(writePath, writeData)); + + transaction.tell(batched, getRef()); + BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class); + assertEquals("getNumBatched", 1, reply.getNumBatched()); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + + @Test(expected=TestException.class) + public void testOnReceiveBatchedModificationsFailure() throws Throwable { + new JavaTestKit(getSystem()) {{ + + DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); + final ActorRef transaction = newTransactionActor(mockWriteTx, + "testOnReceiveBatchedModificationsFailure"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doThrow(new TestException()).when(mockWriteTx).write(path, node); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(path, node)); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + + @Test(expected=IllegalStateException.class) + public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + + @Test + public void testOnReceivePreLithiumReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction"); + "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction().toSerializable(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; // test new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction2"); + "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; } @@ -517,8 +611,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION). @@ -540,4 +633,8 @@ public class ShardTransactionTest extends AbstractActorTest { expectMsgClass(duration("3 seconds"), Terminated.class); }}; } + + public static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index acba775445..026b549028 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); } - batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString())); + batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get()); // Tx 2 should've proceeded to find the primary shard. verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); @@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { Promise readyReplyPromise1 = akka.dispatch.Futures.promise(); doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction(); @@ -205,7 +203,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { writeTx1.ready(); - verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false); + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true); String tx2MemberName = "tx2MemberName"; doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); @@ -247,7 +245,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); } - readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get()); + readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get()); verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), eqCreateTransaction(tx2MemberName, READ_WRITE)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index a247100186..cc9692bfd9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -3,12 +3,12 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -20,11 +20,14 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import com.google.common.base.Optional; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -37,7 +40,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; @@ -45,18 +47,19 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import scala.concurrent.Await; -import scala.concurrent.Future; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Promise; -import scala.concurrent.duration.Duration; @SuppressWarnings("resource") public class TransactionProxyTest extends AbstractTransactionProxyTest { @@ -305,29 +308,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { transactionProxy.exists(TestModel.TEST_PATH); } - private void verifyRecordingOperationFutures(List> futures, - Class... expResultTypes) throws Exception { - assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); - - int i = 0; - for( Future future: futures) { - assertNotNull("Recording operation Future is null", future); - - Class expResultType = expResultTypes[i++]; - if(Throwable.class.isAssignableFrom(expResultType)) { - try { - Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - fail("Expected exception from recording operation Future"); - } catch(Exception e) { - // Expected - } - } else { - assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType, - Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); - } - } - } - @Test public void testWrite() throws Exception { dataStoreContextBuilder.shardBatchedModificationCount(1); @@ -356,8 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); + expectBatchedModificationsReady(actorRef); final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -396,10 +375,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true); } @Test(expected=IllegalStateException.class) @@ -448,7 +424,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } @Test - public void testReadyWithReadWrite() throws Exception { + public void testReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -457,7 +433,34 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedReadData()); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + } + + @Test + public void testReadyWithReadWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -471,31 +474,29 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + verifyBatchedModifications(batchedModifications.get(0), true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent()); } @Test - public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { - dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + public void testReadyWithNoModifications() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModificationsReady(actorRef, 1); + expectBatchedModificationsReady(actorRef); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + transactionProxy.read(TestModel.TEST_PATH); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -503,28 +504,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures()); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, - new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - - verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + verifyBatchedModifications(batchedModifications.get(0), true); } @Test - public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { - dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef, 1); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -536,34 +532,26 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); List batchedModifications = captureBatchedModifications(actorRef); - assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), false, + verifyBatchedModifications(batchedModifications.get(0), true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - verifyBatchedModifications(batchedModifications.get(1), true); - verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test - public void testReadyWithRecordingOperationFailure() throws Exception { + public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectFailedBatchedModifications(actorRef); - - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -575,9 +563,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, TestException.class); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class); + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verifyBatchedModifications(batchedModifications.get(1), true); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test @@ -749,18 +746,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // testing ready - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); @@ -1188,8 +1180,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, shardBatchedModificationCount); - expectReadyTransaction(actorRef); - YangInstanceIdentifier writePath1 = TestModel.TEST_PATH; NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -1234,17 +1224,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); - verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), + verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); - if(optimizedWriteOnly) { - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class); - } else { - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); - } + assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent()); } @Test @@ -1349,8 +1332,80 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { inOrder.verify(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); + } + + @Test + public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException { + + SchemaContext schemaContext = SchemaContextHelper.full(); + Configuration configuration = mock(Configuration.class); + doReturn(configuration).when(mockActorContext).getConfiguration(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames(); + + NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME); + + setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext)); + setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext)); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + + Optional> readOptional = transactionProxy.read( + YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + NormalizedNode normalizedNode = readOptional.get(); + + assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection); + + Collection> collection = (Collection>) normalizedNode.getValue(); + + for(NormalizedNode node : collection){ + assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode); + } + + assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found", + NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null); + + assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME)); + + assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found", + NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null); + + assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME)); + } + + + private void setUpReadData(String shardName, NormalizedNode expectedNode) { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(shardName)); + + doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString()); + + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(txActorRef.path())). + when(mockActorContext).actorSelection(txActorRef.path().toString()); + + doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, TransactionType.READ_ONLY)); + + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build())); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index cc860eafc7..9e1557ae3c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.inOrder; -import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.dispatch.Dispatchers; @@ -246,7 +246,7 @@ public class PreLithiumShardTest extends AbstractShardTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, cohort1, modification1, true), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); @@ -261,11 +261,11 @@ public class PreLithiumShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 2980f83564..4cf8b67ddb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -41,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Future; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -93,6 +95,10 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } + private Future readySerializedTxReply(String path, short version) { + return Futures.successful(new ReadyTransactionReply(path, version).toSerializable()); + } + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); @@ -110,7 +116,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), @@ -170,7 +176,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index c4027ad2a5..1df8e9775b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -46,6 +46,7 @@ public class BatchedModificationsTest { batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); batched.setReady(true); + batched.setTotalMessagesSent(5); BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( (Serializable) batched.toSerializable()); @@ -54,6 +55,7 @@ public class BatchedModificationsTest { assertEquals("getTransactionID", "tx1", clone.getTransactionID()); assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID()); assertEquals("isReady", true, clone.isReady()); + assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent()); assertEquals("getModifications size", 3, clone.getModifications().size()); @@ -91,11 +93,5 @@ public class BatchedModificationsTest { BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone( (Serializable) new BatchedModificationsReply(100).toSerializable()); assertEquals("getNumBatched", 100, clone.getNumBatched()); - assertEquals("getCohortPath", null, clone.getCohortPath()); - - clone = (BatchedModificationsReply) SerializationUtils.clone( - (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable()); - assertEquals("getNumBatched", 50, clone.getNumBatched()); - assertEquals("getCohortPath", "cohort path", clone.getCohortPath()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java new file mode 100644 index 0000000000..db525eafbe --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; + +/** + * Unit tests for ReadyTransactionReply. + * + * @author Thomas Pantelis + */ +public class ReadyTransactionReplyTest { + + @Test + public void testSerialization() { + String cohortPath = "cohort path"; + ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath); + + Object serialized = expected.toSerializable(); + assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass()); + + ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); + assertEquals("getCohortPath", cohortPath, actual.getCohortPath()); + } + + @Test + public void testSerializationWithPreLithiumVersion() throws Exception { + String cohortPath = "cohort path"; + ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION); + + Object serialized = expected.toSerializable(); + assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass()); + + ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion()); + assertEquals("getCohortPath", cohortPath, actual.getCohortPath()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java new file mode 100644 index 0000000000..40d3704d2c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class NormalizedNodeAggregatorTest { + + @Test + public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException { + SchemaContext schemaContext = SchemaContextHelper.full(); + NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME); + + Optional> optional = NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), + Lists.newArrayList( + Optional.>of(getRootNode(expectedNode1, schemaContext)), + Optional.>of(getRootNode(expectedNode2, schemaContext))), + schemaContext); + + + NormalizedNode normalizedNode = optional.get(); + + assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection); + + Collection> collection = (Collection>) normalizedNode.getValue(); + + for(NormalizedNode node : collection){ + assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode); + } + + assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found", + findChildWithQName(collection, TestModel.TEST_QNAME) != null); + + assertEquals(expectedNode1, findChildWithQName(collection, TestModel.TEST_QNAME)); + + assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found", + findChildWithQName(collection, CarsModel.BASE_QNAME) != null); + + assertEquals(expectedNode2, findChildWithQName(collection, CarsModel.BASE_QNAME)); + + } + + public static NormalizedNode getRootNode(NormalizedNode moduleNode, SchemaContext schemaContext) throws ReadFailedException, ExecutionException, InterruptedException { + InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", Executors.newSingleThreadExecutor()); + store.onGlobalContextUpdated(schemaContext); + + DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + + writeTransaction.merge(YangInstanceIdentifier.builder().node(moduleNode.getNodeType()).build(), moduleNode); + + DOMStoreThreePhaseCommitCohort ready = writeTransaction.ready(); + + ready.canCommit().get(); + ready.preCommit().get(); + ready.commit().get(); + + DOMStoreReadTransaction readTransaction = store.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> read = readTransaction.read(YangInstanceIdentifier.builder().build()); + + Optional> nodeOptional = read.checkedGet(); + + return nodeOptional.get(); + } + + public static NormalizedNode findChildWithQName(Collection> collection, QName qName) { + for(NormalizedNode node : collection){ + if(node.getNodeType().equals(qName)){ + return node; + } + } + + return null; + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java index 1dfc607e6f..611303a41a 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractCheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -16,13 +15,17 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile * A {@link Future} used to report the status of an future {@link java.util.concurrent.Future}. */ final class PingPongFuture extends AbstractCheckedFuture { - protected PingPongFuture(final ListenableFuture delegate) { - super(delegate); - } + protected PingPongFuture(final ListenableFuture delegate) { + super(delegate); + } - @Override - protected TransactionCommitFailedException mapException(final Exception e) { - Preconditions.checkArgument(e instanceof TransactionCommitFailedException); - return (TransactionCommitFailedException) e; + @Override + protected TransactionCommitFailedException mapException(final Exception e) { + if (e.getCause() instanceof TransactionCommitFailedException){ + return (TransactionCommitFailedException) e.getCause(); + } else { + return new TransactionCommitFailedException(e.getMessage(), e.getCause(), null); } + } } + diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java index e06f572cad..303f3e6923 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java @@ -31,6 +31,7 @@ import javax.xml.transform.dom.DOMResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.util.OrderedNormalizedNodeWriter; import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -44,7 +45,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; @@ -206,17 +206,15 @@ public class NetconfMessageTransformer implements MessageTransformer editElement : normalized.getValue()) { - normalizedNodeWriter.write(editElement); - } + normalizedNodeWriter = new OrderedNormalizedNodeWriter(normalizedNodeStreamWriter, baseNetconfCtx, schemaPath); + Collection> value = (Collection) normalized.getValue(); + normalizedNodeWriter.write(value); normalizedNodeWriter.flush(); } finally { try { diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java index a3cd3c7afa..51f9e220c5 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java @@ -14,6 +14,7 @@ import com.google.common.util.concurrent.CheckedFuture; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import javax.annotation.Nullable; @@ -32,16 +33,15 @@ import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext; +import org.opendaylight.controller.netconf.util.OrderedNormalizedNodeWriter; import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils; import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory; @@ -211,7 +211,7 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation { final NormalizedNodeStreamWriter nnStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter, schemaContext.getCurrentContext(), rpcOutputPath); - final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(nnStreamWriter); + final OrderedNormalizedNodeWriter nnWriter = new OrderedNormalizedNodeWriter(nnStreamWriter, schemaContext.getCurrentContext(), rpcOutputPath); writeRootElement(xmlWriter, nnWriter, (ContainerNode) data); try { @@ -232,11 +232,10 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation { } } - private void writeRootElement(final XMLStreamWriter xmlWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) { + private void writeRootElement(final XMLStreamWriter xmlWriter, final OrderedNormalizedNodeWriter nnWriter, final ContainerNode data) { try { - for (final DataContainerChild child : data.getValue()) { - nnWriter.write(child); - } + Collection> value = (Collection) data.getValue(); + nnWriter.write(value); nnWriter.flush(); xmlWriter.flush(); } catch (XMLStreamException | IOException e) { diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java b/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java index 32eb08c2e7..040066d91c 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.xml.transform.TransformerException; import org.custommonkey.xmlunit.DetailedDiff; import org.custommonkey.xmlunit.Diff; import org.custommonkey.xmlunit.XMLUnit; @@ -201,6 +202,18 @@ public class RuntimeRpcTest { verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid-control.xml")); } + @Test + public void testSuccesfullContainerInvocation() throws Exception { + RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceSuccesfullInvocation); + + Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-container.xml"); + HandlingPriority priority = rpc.canHandle(rpcDocument); + Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE); + + Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-container-control.xml")); + } + @Test public void testFailedInvocation() throws Exception { RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceFailedInvocation); @@ -232,10 +245,11 @@ public class RuntimeRpcTest { verifyResponse(response, RPC_REPLY_OK); } - private void verifyResponse(Document response, Document template) { + private void verifyResponse(Document response, Document template) throws IOException, TransformerException { DetailedDiff dd = new DetailedDiff(new Diff(response, template)); dd.overrideElementQualifier(new RecursiveElementNameAndTextQualifier()); - assertTrue(dd.similar()); + //we care about order so response has to be identical + assertTrue(dd.identical()); } private RpcDefinition getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) { diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml new file mode 100644 index 0000000000..1c06ea95ff --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml @@ -0,0 +1,27 @@ + + + + + + cont1 input string 1 + + + cont1 input string 2 + + + + + cont2 input string 1 + + + cont2 input string 2 + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml new file mode 100644 index 0000000000..570d6df2f0 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml @@ -0,0 +1,29 @@ + + + + + + + cont1 input string 1 + + + cont1 input string 2 + + + + + cont2 input string 1 + + + cont2 input string 2 + + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang index d493840828..6a59cdcf6e 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang @@ -40,5 +40,51 @@ module rpc-test { } } } + + rpc container-rpc { + input { + container cont1 { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + + container cont2 { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + } + + output { + container cont1 { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + + container cont2 { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + } + } } diff --git a/opendaylight/netconf/netconf-util/pom.xml b/opendaylight/netconf/netconf-util/pom.xml index 6f9ebede8d..5d82cf1ccd 100644 --- a/opendaylight/netconf/netconf-util/pom.xml +++ b/opendaylight/netconf/netconf-util/pom.xml @@ -54,6 +54,10 @@ org.opendaylight.yangtools yang-model-api + + org.opendaylight.yangtools + yang-data-api + diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/OrderedNormalizedNodeWriter.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/OrderedNormalizedNodeWriter.java new file mode 100644 index 0000000000..e5d63b0fab --- /dev/null +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/OrderedNormalizedNodeWriter.java @@ -0,0 +1,331 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.util; + +import static org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter.UNKNOWN_SIZE; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode; +import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode; +import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamAttributeWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode; +import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode; +import org.opendaylight.yangtools.yang.model.api.DataNodeContainer; +import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; +import org.opendaylight.yangtools.yang.model.api.ListSchemaNode; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +//TODO this does not extend NormalizedNodeWriter from yangtools due to api freeze, make this inherit common methods to avoid code duplication +//TODO move this to yangtools, since this is in netconf-util due to api freeze in lithium +public class OrderedNormalizedNodeWriter implements Closeable, Flushable{ + + private final SchemaContext schemaContext; + private final SchemaNode root; + private final NormalizedNodeStreamWriter writer; + + public OrderedNormalizedNodeWriter(NormalizedNodeStreamWriter writer, SchemaContext schemaContext, SchemaPath path) { + this.writer = writer; + this.schemaContext = schemaContext; + this.root = findParentSchemaOnPath(schemaContext, path); + } + + public OrderedNormalizedNodeWriter write(final NormalizedNode node) throws IOException { + if (root == schemaContext) { + return write(node, schemaContext.getDataChildByName(node.getNodeType())); + } + + return write(node, root); + } + + public OrderedNormalizedNodeWriter write(final Collection> nodes) throws IOException { + if (writeChildren(nodes, root, false)) { + return this; + } + + throw new IllegalStateException("It wasn't possible to serialize nodes " + nodes); + + } + + private OrderedNormalizedNodeWriter write(NormalizedNode node, SchemaNode dataSchemaNode) throws IOException { + if (node == null) { + return this; + } + + if (wasProcessedAsCompositeNode(node, dataSchemaNode)) { + return this; + } + + if (wasProcessAsSimpleNode(node)) { + return this; + } + + throw new IllegalStateException("It wasn't possible to serialize node " + node); + } + + private void write(List> nodes, SchemaNode dataSchemaNode) throws IOException { + for (NormalizedNode node : nodes) { + write(node, dataSchemaNode); + } + } + + private OrderedNormalizedNodeWriter writeLeaf(final NormalizedNode node) throws IOException { + if (wasProcessAsSimpleNode(node)) { + return this; + } + + throw new IllegalStateException("It wasn't possible to serialize node " + node); + } + + private boolean writeChildren(final Iterable> children, SchemaNode parentSchemaNode, boolean endParent) throws IOException { + //Augmentations cannot be gotten with node.getChild so create our own structure with augmentations resolved + ArrayListMultimap> qNameToNodes = ArrayListMultimap.create(); + for (NormalizedNode child : children) { + if (child instanceof AugmentationNode) { + qNameToNodes.putAll(resolveAugmentations(child)); + } else { + qNameToNodes.put(child.getNodeType(), child); + } + } + + if (parentSchemaNode instanceof DataNodeContainer) { + if (parentSchemaNode instanceof ListSchemaNode && qNameToNodes.containsKey(parentSchemaNode.getQName())) { + write(qNameToNodes.get(parentSchemaNode.getQName()), parentSchemaNode); + } else { + for (DataSchemaNode schemaNode : ((DataNodeContainer) parentSchemaNode).getChildNodes()) { + write(qNameToNodes.get(schemaNode.getQName()), schemaNode); + } + } + } else if(parentSchemaNode instanceof ChoiceSchemaNode) { + for (ChoiceCaseNode ccNode : ((ChoiceSchemaNode) parentSchemaNode).getCases()) { + for (DataSchemaNode dsn : ccNode.getChildNodes()) { + if (qNameToNodes.containsKey(dsn.getQName())) { + write(qNameToNodes.get(dsn.getQName()), dsn); + } + } + } + } else { + for (NormalizedNode child : children) { + writeLeaf(child); + } + } + if (endParent) { + writer.endNode(); + } + return true; + } + + private ArrayListMultimap> resolveAugmentations(NormalizedNode child) { + final ArrayListMultimap> resolvedAugs = ArrayListMultimap.create(); + for (NormalizedNode node : ((AugmentationNode) child).getValue()) { + if (node instanceof AugmentationNode) { + resolvedAugs.putAll(resolveAugmentations(node)); + } else { + resolvedAugs.put(node.getNodeType(), node); + } + } + return resolvedAugs; + } + + private boolean writeMapEntryNode(final MapEntryNode node, final SchemaNode dataSchemaNode) throws IOException { + if(writer instanceof NormalizedNodeStreamAttributeWriter) { + ((NormalizedNodeStreamAttributeWriter) writer) + .startMapEntryNode(node.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(node.getValue()), node.getAttributes()); + } else { + writer.startMapEntryNode(node.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(node.getValue())); + } + return writeChildren(node.getValue(), dataSchemaNode, true); + } + + private boolean wasProcessAsSimpleNode(final NormalizedNode node) throws IOException { + if (node instanceof LeafSetEntryNode) { + final LeafSetEntryNode nodeAsLeafList = (LeafSetEntryNode)node; + if(writer instanceof NormalizedNodeStreamAttributeWriter) { + ((NormalizedNodeStreamAttributeWriter) writer).leafSetEntryNode(nodeAsLeafList.getValue(), nodeAsLeafList.getAttributes()); + } else { + writer.leafSetEntryNode(nodeAsLeafList.getValue()); + } + return true; + } else if (node instanceof LeafNode) { + final LeafNode nodeAsLeaf = (LeafNode)node; + if(writer instanceof NormalizedNodeStreamAttributeWriter) { + ((NormalizedNodeStreamAttributeWriter) writer).leafNode(nodeAsLeaf.getIdentifier(), nodeAsLeaf.getValue(), nodeAsLeaf.getAttributes()); + } else { + writer.leafNode(nodeAsLeaf.getIdentifier(), nodeAsLeaf.getValue()); + } + return true; + } else if (node instanceof AnyXmlNode) { + final AnyXmlNode anyXmlNode = (AnyXmlNode)node; + writer.anyxmlNode(anyXmlNode.getIdentifier(), anyXmlNode.getValue()); + return true; + } + + return false; + } + + private boolean wasProcessedAsCompositeNode(final NormalizedNode node, SchemaNode dataSchemaNode) throws IOException { + if (node instanceof ContainerNode) { + final ContainerNode n = (ContainerNode) node; + if(writer instanceof NormalizedNodeStreamAttributeWriter) { + ((NormalizedNodeStreamAttributeWriter) writer).startContainerNode(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue()), n.getAttributes()); + } else { + writer.startContainerNode(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + } + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof MapEntryNode) { + return writeMapEntryNode((MapEntryNode) node, dataSchemaNode); + } + if (node instanceof UnkeyedListEntryNode) { + final UnkeyedListEntryNode n = (UnkeyedListEntryNode) node; + writer.startUnkeyedListItem(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof ChoiceNode) { + final ChoiceNode n = (ChoiceNode) node; + writer.startChoiceNode(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof AugmentationNode) { + final AugmentationNode n = (AugmentationNode) node; + writer.startAugmentationNode(n.getIdentifier()); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof UnkeyedListNode) { + final UnkeyedListNode n = (UnkeyedListNode) node; + writer.startUnkeyedList(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof OrderedMapNode) { + final OrderedMapNode n = (OrderedMapNode) node; + writer.startOrderedMapNode(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof MapNode) { + final MapNode n = (MapNode) node; + writer.startMapNode(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + if (node instanceof LeafSetNode) { + //covers also OrderedLeafSetNode for which doesn't exist start* method + final LeafSetNode n = (LeafSetNode) node; + writer.startLeafSet(n.getIdentifier(), OrderedNormalizedNodeWriter.childSizeHint(n.getValue())); + return writeChildren(n.getValue(), dataSchemaNode, true); + } + + return false; + } + + private static final int childSizeHint(final Iterable children) { + return (children instanceof Collection) ? ((Collection) children).size() : UNKNOWN_SIZE; + } + + //TODO similar code is already present in schemaTracker, unify this when this writer is moved back to yangtools + private SchemaNode findParentSchemaOnPath(SchemaContext schemaContext, SchemaPath path) { + SchemaNode current = Preconditions.checkNotNull(schemaContext); + for (final QName qname : path.getPathFromRoot()) { + SchemaNode child; + if(current instanceof DataNodeContainer) { + child = ((DataNodeContainer) current).getDataChildByName(qname); + + if (child == null && current instanceof SchemaContext) { + child = tryFindGroupings((SchemaContext) current, qname).orNull(); + } + + if(child == null && current instanceof SchemaContext) { + child = tryFindNotification((SchemaContext) current, qname) + .or(tryFindRpc(((SchemaContext) current), qname)).orNull(); + } + } else if (current instanceof ChoiceSchemaNode) { + child = ((ChoiceSchemaNode) current).getCaseNodeByName(qname); + } else if (current instanceof RpcDefinition) { + switch (qname.getLocalName()) { + case "input": + child = ((RpcDefinition) current).getInput(); + break; + case "output": + child = ((RpcDefinition) current).getOutput(); + break; + default: + child = null; + break; + } + } else { + throw new IllegalArgumentException(String.format("Schema node %s does not allow children.", current)); + } + current = child; + } + return current; + } + + //TODO this method is already present in schemaTracker, unify this when this writer is moved back to yangtools + private Optional tryFindGroupings(final SchemaContext ctx, final QName qname) { + return Optional. fromNullable(Iterables.find(ctx.getGroupings(), new SchemaNodePredicate(qname), null)); + } + + //TODO this method is already present in schemaTracker, unify this when this writer is moved back to yangtools + private Optional tryFindRpc(final SchemaContext ctx, final QName qname) { + return Optional.fromNullable(Iterables.find(ctx.getOperations(), new SchemaNodePredicate(qname), null)); + } + + //TODO this method is already present in schemaTracker, unify this when this writer is moved back to yangtools + private Optional tryFindNotification(final SchemaContext ctx, final QName qname) { + return Optional.fromNullable(Iterables.find(ctx.getNotifications(), new SchemaNodePredicate(qname), null)); + } + + @Override + public void flush() throws IOException { + writer.flush(); + } + + @Override + public void close() throws IOException { + writer.flush(); + writer.close(); + } + + //TODO this class is already present in schemaTracker, unify this when this writer is moved back to yangtools + private static final class SchemaNodePredicate implements Predicate { + private final QName qname; + + public SchemaNodePredicate(final QName qname) { + this.qname = qname; + } + + @Override + public boolean apply(final SchemaNode input) { + return input.getQName().equals(qname); + } + } +} \ No newline at end of file