<artifactId>${artifactId}-impl</artifactId>
<version>${symbol_dollar}{project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${symbol_dollar}{project.groupId}</groupId>
+ <artifactId>${artifactId}-impl</artifactId>
+ <version>${symbol_dollar}{project.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </dependency>
<dependency>
<groupId>${symbol_dollar}{project.groupId}</groupId>
<artifactId>${artifactId}-api</artifactId>
<yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
<yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
<yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
- <sshd-core.version>0.12.0</sshd-core.version>
+ <sshd-core.version>0.14.0</sshd-core.version>
<jmh.version>0.9.7</jmh.version>
<lmax.version>3.3.0</lmax.version>
</properties>
<provider>/modules/module[type='runtime-generated-mapping'][name='runtime-mapping-singleton']</provider>
</instance>
</service>
+ <service>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-normalized-node-serializer</type>
+ <instance>
+ <name>runtime-mapping-singleton</name>
+ <provider>/modules/module[type='runtime-generated-mapping'][name='runtime-mapping-singleton']</provider>
+ </instance>
+ </service>
<service>
<type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">binding-impl:binding-new-notification-service</type>
<instance>
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
public class ExampleActor extends RaftActor {
private final Map<String, String> state = new HashMap();
- private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
private final Optional<ActorRef> roleChangeNotifier;
public ExampleActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
roleChangeNotifier = createRoleChangeNotifier(id);
}
}
- @Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
@Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.slf4j.Logger;
+
+/**
+ * Implementation of ElectionTerm for the RaftActor.
+ */
+class ElectionTermImpl implements ElectionTerm {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private long currentTerm = 0;
+ private String votedFor = null;
+
+ private final DataPersistenceProvider persistence;
+
+ private final Logger log;
+ private final String logId;
+
+ ElectionTermImpl(DataPersistenceProvider persistence, String logId, Logger log) {
+ this.persistence = persistence;
+ this.logId = logId;
+ this.log = log;
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ @Override
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override public void update(long currentTerm, String votedFor) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Set currentTerm={}, votedFor={}", logId, currentTerm, votedFor);
+ }
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ @Override
+ public void updateAndPersist(long currentTerm, String votedFor){
+ update(currentTerm, votedFor);
+ // FIXME : Maybe first persist then update the state
+ persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance());
+ }
+}
\ No newline at end of file
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
private final String id;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
- private volatile long matchIndex;
+ private long matchIndex;
private long lastReplicatedIndex = -1L;
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
@Override
public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ return matchIndex++;
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+
+/**
+ * An akka Procedure that does nothing.
+ *
+ * @author Thomas Pantelis
+ */
+public class NoopProcedure<T> implements Procedure<T> {
+
+ private static final NoopProcedure<Object> INSTANCE = new NoopProcedure<>();
+
+ private NoopProcedure() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> NoopProcedure<T> instance() {
+ return (NoopProcedure<T>) INSTANCE;
+ }
+
+ @Override
+ public void apply(Object notUsed) {
+ }
+}
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
- new Procedure<ApplyJournalEntries>() {
- @Override
- public void apply(ApplyJournalEntries param) throws Exception {
- }
- };
+ private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
*/
private final RaftActorContextImpl context;
+ private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+
+ private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
/**
* The in-memory journal
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
- private CaptureSnapshot captureSnapshot = null;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
Optional<ConfigParams> configParams) {
context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(),
+ this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
-1, -1, replicatedLog, peerAddresses,
(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
LOG);
LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
- persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
+ persistence().persist(applyEntries, NoopProcedure.instance());
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
persistenceId(), saveSnapshotFailure.cause());
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ context.getSnapshotManager().rollback();
} else if (message instanceof CaptureSnapshot) {
LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
- if(captureSnapshot == null) {
- captureSnapshot = (CaptureSnapshot)message;
- createSnapshot();
- }
+ context.getSnapshotManager().create(createSnapshotProcedure);
- } else if (message instanceof CaptureSnapshotReply){
+ } else if (message instanceof CaptureSnapshotReply) {
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(currentBehavior);
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog.dataSize())
.inMemoryJournalLogSize(replicatedLog.size())
- .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
.lastIndex(replicatedLog.lastIndex())
.lastTerm(replicatedLog.lastTerm())
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!context.isSnapshotCaptureInitiated()){
- raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
- raftContext.getTermInformation().getCurrentTerm());
- raftContext.getReplicatedLog().snapshotCommit();
- } else {
- LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
- persistenceId(), getId());
- }
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
} else if (clientActor != null) {
// Send message for replication
currentBehavior.handleMessage(getSelf(),
context.setConfigParams(configParams);
}
+ public final DataPersistenceProvider persistence() {
+ return delegatingPersistenceProvider.getDelegate();
+ }
+
+ public void setPersistence(DataPersistenceProvider provider) {
+ delegatingPersistenceProvider.setDelegate(provider);
+ }
+
+ protected void setPersistence(boolean persistent) {
+ if(persistent) {
+ setPersistence(new PersistentDataProvider(this));
+ } else {
+ setPersistence(new NonPersistentDataProvider() {
+ /**
+ * The way snapshotting works is,
+ * <ol>
+ * <li> RaftActor calls createSnapshot on the Shard
+ * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+ * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+ * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+ * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+ * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+ * </ol>
+ */
+ @Override
+ public void saveSnapshot(Object o) {
+ // Make saving Snapshot successful
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ });
+ }
+ }
+
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
}
protected void commitSnapshot(long sequenceNumber) {
- context.getReplicatedLog().snapshotCommit();
-
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(sequenceNumber);
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
}
/**
*/
protected abstract void onStateChanged();
- protected abstract DataPersistenceProvider persistence();
-
/**
* Notifier Actor for this RaftActor to notify when a role change happens
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- persistence().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
- // Trim akka journal
- persistence().deleteMessages(sequenceNumber);
- }
-
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
- persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
- }
-
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
- // install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- }
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
-
- if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
- }
-
- captureSnapshot = null;
- context.setSnapshotCaptureInitiated(false);
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
}
protected long getTotalMemory() {
}
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0;
+ private long dataSizeSinceLastSnapshot = 0L;
+
public ReplicatedLogImpl(Snapshot snapshot) {
super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
- if(!hasFollowers()) {
+ if (!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
// due to this the journalSize will never become anything close to the
// snapshot batch count. In fact will mostly be 1.
// as if we were maintaining a real snapshot
dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
}
-
+ long journalSize = replicatedLogEntry.getIndex() + 1;
long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
-
- dataSizeSinceLastSnapshot = 0;
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
- " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- }
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
}
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
}
+
if (callback != null){
callback.apply(replicatedLogEntry);
}
}
}
-
- private class ElectionTermImpl implements ElectionTerm {
- /**
- * Identifier of the actor whose election term information this is
- */
- private long currentTerm = 0;
- private String votedFor = null;
-
- @Override
- public long getCurrentTerm() {
- return currentTerm;
- }
-
- @Override
- public String getVotedFor() {
- return votedFor;
- }
-
- @Override public void update(long currentTerm, String votedFor) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
- }
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
- }
-
- @Override
- public void updateAndPersist(long currentTerm, String votedFor){
- update(currentTerm, votedFor);
- // FIXME : Maybe first persist then update the state
- persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
-
- @Override public void apply(UpdateElectionTerm param)
- throws Exception {
-
- }
- });
- }
- }
-
static class UpdateElectionTerm implements Serializable {
private static final long serialVersionUID = 1L;
private final long currentTerm;
}
}
- protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
- public NonPersistentRaftDataProvider(){
-
- }
+ private class CreateSnapshotProcedure implements Procedure<Void> {
- /**
- * The way snapshotting works is,
- * <ol>
- * <li> RaftActor calls createSnapshot on the Shard
- * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
- * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
- * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
- * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
- * in SaveSnapshotSuccess.
- * </ol>
- * @param o
- */
@Override
- public void saveSnapshot(Object o) {
- // Make saving Snapshot successful
- commitSnapshot(-1L);
+ public void apply(Void aVoid) throws Exception {
+ createSnapshot();
}
}
*/
ConfigParams getConfigParams();
- void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
- boolean isSnapshotCaptureInitiated();
+ SnapshotManager getSnapshotManager();
}
private boolean snapshotCaptureInitiated;
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
- @Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
- }
-
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
peerAddresses.put(peerId, peerAddress);
}
+
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+ private final SnapshotState IDLE = new Idle();
+ private final SnapshotState CAPTURING = new Capturing();
+ private final SnapshotState PERSISTING = new Persisting();
+ private final SnapshotState CREATING = new Creating();
+
+ private final Logger LOG;
+ private final RaftActorContext context;
+ private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+ new LastAppliedTermInformationReader();
+ private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+ new ReplicatedToAllTermInformationReader();
+
+
+ private SnapshotState currentState = IDLE;
+ private CaptureSnapshot captureSnapshot;
+
+ public SnapshotManager(RaftActorContext context, Logger logger) {
+ this.context = context;
+ this.LOG = logger;
+ }
+
+ @Override
+ public boolean isCapturing() {
+ return currentState.isCapturing();
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return currentState.capture(lastLogEntry, replicatedToAllIndex);
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ currentState.create(callback);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ currentState.commit(persistenceProvider, sequenceNumber);
+ }
+
+ @Override
+ public void rollback() {
+ currentState.rollback();
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return currentState.trimLog(desiredTrimIndex, currentBehavior);
+ }
+
+ private boolean hasFollowers(){
+ return context.getPeerAddresses().keySet().size() > 0;
+ }
+
+ private String persistenceId(){
+ return context.getId();
+ }
+
+ private class AbstractSnapshotState implements SnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return false;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ LOG.debug("capture should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ LOG.debug("captureToInstall should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ LOG.debug("create should not be called in state {}", this);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ LOG.debug("persist should not be called in state {}", this);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ LOG.debug("commit should not be called in state {}", this);
+ }
+
+ @Override
+ public void rollback() {
+ LOG.debug("rollback should not be called in state {}", this);
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ LOG.debug("trimLog should not be called in state {}", this);
+ return -1;
+ }
+
+ protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+ }
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+ context.getTermInformation().getCurrentTerm());
+
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ // It's possible a follower was lagging and an install snapshot advanced its match index past
+ // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+ // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+ // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+ // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+ currentBehavior.setReplicatedToAllIndex(tempMin);
+ }
+ return -1;
+ }
+ }
+
+ private class Idle extends AbstractSnapshotState {
+
+ private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ TermInformationReader lastAppliedTermInfoReader =
+ lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+ lastLogEntry, hasFollowers());
+
+ long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+ long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+ TermInformationReader replicatedToAllTermInfoReader =
+ replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+ long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+ long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+ lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+ newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+
+ SnapshotManager.this.currentState = CAPTURING;
+
+ if(targetFollower != null){
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ persistenceId(), captureSnapshot, targetFollower);
+ }
+
+ context.getActor().tell(captureSnapshot, context.getActor());
+
+ return true;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null);
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public String toString() {
+ return "Idle";
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return doTrimLog(desiredTrimIndex, currentBehavior);
+ }
+ }
+
+ private class Capturing extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ try {
+ callback.apply(null);
+ SnapshotManager.this.currentState = CREATING;
+ } catch (Exception e) {
+ LOG.error("Unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Capturing";
+ }
+
+ }
+
+ private class Creating extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(snapshotBytes,
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ persistenceProvider.saveSnapshot(sn);
+
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+ long dataThreshold = totalMemory *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+ persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+ captureSnapshot.getLastAppliedIndex());
+ }
+
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
+ }
+
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ if (context.getId().equals(currentBehavior.getLeaderId())
+ && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
+ }
+
+ captureSnapshot = null;
+ SnapshotManager.this.currentState = PERSISTING;
+ }
+
+ @Override
+ public String toString() {
+ return "Creating";
+ }
+
+ }
+
+ private class Persisting extends AbstractSnapshotState {
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ context.getReplicatedLog().snapshotCommit();
+ persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ persistenceProvider.deleteMessages(sequenceNumber);
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public void rollback() {
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public String toString() {
+ return "Persisting";
+ }
+
+ }
+
+ private static interface TermInformationReader {
+ long getIndex();
+ long getTerm();
+ }
+
+ static class LastAppliedTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+ ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+ if (!hasFollowers) {
+ if(lastLogEntry != null) {
+ index = lastLogEntry.getIndex();
+ term = lastLogEntry.getTerm();
+ }
+ } else if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ } else if(log.getSnapshotIndex() > -1){
+ index = log.getSnapshotIndex();
+ term = log.getSnapshotTerm();
+ }
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+
+ private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+
+ if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ }
+
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+ /**
+ * Should return true when a snapshot is being captured
+ * @return
+ */
+ boolean isCapturing();
+
+ /**
+ * Initiate capture snapshot
+ *
+ * @param lastLogEntry the last entry in the replicated log
+ * @param replicatedToAllIndex the current replicatedToAllIndex
+ *
+ * @return true if capture was started
+ */
+ boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+ /**
+ * Initiate capture snapshot for the purposing of installing that snapshot
+ *
+ * @param lastLogEntry
+ * @param replicatedToAllIndex
+ * @param targetFollower
+ *
+ * @return true if capture was started
+ */
+ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+
+ /**
+ * Create the snapshot
+ *
+ * @param callback a procedure to be called which should create the snapshot
+ */
+ void create(Procedure<Void> callback);
+
+ /**
+ * Persist the snapshot
+ *
+ * @param persistenceProvider
+ * @param snapshotBytes
+ * @param currentBehavior
+ * @param totalMemory
+ */
+ void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+ ,long totalMemory);
+
+ /**
+ * Commit the snapshot by trimming the log
+ *
+ * @param persistenceProvider
+ * @param sequenceNumber
+ */
+ void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+ /**
+ * Rollback the snapshot
+ */
+ void rollback();
+
+ /**
+ * Trim the log
+ *
+ * @param desiredTrimIndex
+ * @return the actual trim index
+ */
+ long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
- logName, snapshotCapturedIndex, lastApplied, tempMin);
- }
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
- } else if(tempMin > getReplicatedToAllIndex()) {
- // It's possible a follower was lagging and an install snapshot advanced its match index past
- // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
- // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
- // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
- // trim the log to the last applied index even if previous entries weren't replicated to all followers.
- setReplicatedToAllIndex(tempMin);
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
votesRequired = getMajorityVoteCount(peers.size());
startNewTerm();
- scheduleElection(electionDuration());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
+
}
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- scheduleElection(electionDuration());
-
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
}
private boolean isLogEntryPresent(long index){
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
TestActorRef<MessageCollectorActor> collectorActor) {
super(id, peerAddresses, Optional.of(config), null);
- dataPersistenceProvider = new PersistentDataProvider();
this.collectorActor = collectorActor;
}
assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
}
+
+ protected String testActorPath(String id){
+ return "akka://test/user" + id;
+ }
}
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
+ private SnapshotManager snapshotManager;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
}
@Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
+ public SnapshotManager getSnapshotManager() {
+ if(this.snapshotManager == null){
+ this.snapshotManager = new SnapshotManager(this, getLogger());
+ }
+ return this.snapshotManager;
}
public void setConfigParams(ConfigParams configParams) {
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
public static class MockRaftActor extends RaftActor {
- protected DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
state = new ArrayList<>();
this.delegate = mock(RaftActor.class);
if(dataPersistenceProvider == null){
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
} else {
- this.dataPersistenceProvider = dataPersistenceProvider;
+ setPersistence(dataPersistenceProvider);
}
}
}
}
+
+ public void waitUntilLeader(){
+ for(int i = 0;i < 10; i++){
+ if(isLeader()){
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
public List<Object> getState() {
return state;
}
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+ DataPersistenceProvider dataPersistenceProvider){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+ }
+
+
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
delegate.onStateChanged();
}
- @Override
- protected DataPersistenceProvider persistence() {
- return this.dataPersistenceProvider;
- }
-
@Override
protected Optional<ActorRef> getRoleChangeNotifier() {
return Optional.fromNullable(roleChangeNotifier);
assertEquals("remove log entries", 1, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+ mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
assertEquals("remove log entries", 0, replicatedLog.size());
- mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+ mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
- verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
}
};
}
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
}
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+ new MockRaftActorContext.MockPayload("D")), -1);
+
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
- mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+ mockRaftActor.getReplicatedLog().append(lastEntry);
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
long replicatedToAllIndex = 1;
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+ mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
verify(mockRaftActor.delegate).createSnapshot();
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("D")), 1);
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
}
@Test
- public void testRaftRoleChangeNotifier() throws Exception {
+ public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
Props.create(MessageCollectorActor.class));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
long heartBeatInterval = 100;
config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
- config.setElectionTimeoutFactor(1);
+ config.setElectionTimeoutFactor(20);
String persistenceId = factory.generateActorId("notifier-");
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+ Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
+ new NonPersistentDataProvider()), persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
+
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
}};
}
+ @Test
+ public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
+ String persistenceId = factory.generateActorId("notifier-");
+
+ factory.createActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+
+ List<RoleChanged> matches = null;
+ for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ if(matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
+
+ assertEquals(2, matches.size());
+
+ // check if the notifier got a role change from null to Follower
+ RoleChanged raftRoleChanged = matches.get(0);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertNull(raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Follower to Candidate
+ raftRoleChanged = matches.get(1);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+ }};
+ }
+
@Test
public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
new JavaTestKit(getSystem()) {
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+ leaderActor.getRaftActorContext().getSnapshotManager()
+ .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("x")), 4);
- leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
+ , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+ followerActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("D")), 4);
- followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
assertEquals(6, followerActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
};
}
-
- private static class NonPersistentProvider implements DataPersistenceProvider {
- @Override
- public boolean isRecoveryApplicable() {
- return false;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- try {
- procedure.apply(o);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void saveSnapshot(Object o) {
-
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
-
- }
- }
-
@Test
public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
new JavaTestKit(getSystem()) {{
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();
// Trimming log in this scenario is a no-op
assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(-1, leader.getReplicatedToAllIndex());
}};
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
- DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
Map<String, String> peerAddresses = new HashMap<>();
// Trimming log in this scenario is a no-op
assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(3, leader.getReplicatedToAllIndex());
}};
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
-
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+ @Mock
+ private RaftActorContext mockRaftActorContext;
+
+ @Mock
+ private ConfigParams mockConfigParams;
+
+ @Mock
+ private ReplicatedLog mockReplicatedLog;
+
+ @Mock
+ private DataPersistenceProvider mockDataPersistenceProvider;
+
+ @Mock
+ private RaftActorBehavior mockRaftActorBehavior;
+
+ @Mock
+ private Procedure<Void> mockProcedure;
+
+ private SnapshotManager snapshotManager;
+
+ private TestActorFactory factory;
+
+ private TestActorRef<MessageCollectorActor> actorRef;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+ doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+ doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+ doReturn("123").when(mockRaftActorContext).getId();
+ doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+
+ snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+ factory = new TestActorFactory(getSystem());
+
+ actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+ doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+ }
+
+ @After
+ public void tearDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testConstruction(){
+ assertEquals(false, snapshotManager.isCapturing());
+ }
+
+ @Test
+ public void testCaptureToInstall(){
+
+ // Force capturing toInstall = true
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload()), 0, "follower-1");
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+ actorRef.underlyingActor().clear();
+ }
+
+ @Test
+ public void testCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+ actorRef.underlyingActor().clear();
+
+ }
+
+ @Test
+ public void testIllegalCapture() throws Exception {
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+
+ // This will not cause snapshot capture to start again
+ capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertFalse(capture);
+
+ allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexMinusOne(){
+ doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
+
+ doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses();
+
+ doReturn(8L).when(mockRaftActorContext).getLastApplied();
+
+ MockRaftActorContext.MockReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 3L, 9L, new MockRaftActorContext.MockPayload());
+
+ MockRaftActorContext.MockReplicatedLogEntry lastAppliedEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 2L, 8L, new MockRaftActorContext.MockPayload());
+
+ doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L);
+ doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L);
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(lastLogEntry, -1);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+ assertEquals("getLastTerm", 3L, snapshot.getLastTerm());
+ assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
+ assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
+ assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
+
+ verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
+ }
+
+
+ @Test
+ public void testCreate() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure).apply(null);
+
+ assertEquals("isCapturing", true, snapshotManager.isCapturing());
+ }
+
+ @Test
+ public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateBeforeCapture() throws Exception {
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(0)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateAfterPersist() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ reset(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, never()).apply(null);
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexNotMinus(){
+ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+ doReturn(6L).when(replicatedLogEntry).getTerm();
+ doReturn(9L).when(replicatedLogEntry).getIndex();
+
+ // when replicatedToAllIndex != -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+ assertEquals("getLastTerm", 6L, snapshot.getLastTerm());
+ assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
+ assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
+ assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+ }
+
+
+ @Test
+ public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+ }
+
+ @Test
+ public void testPersistSendInstallSnapshot(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ assertTrue(capture);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+ = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+ SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+ assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ }
+
+ @Test
+ public void testCallingPersistWithoutCaptureWillDoNothing(){
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+ @Test
+ public void testCallingPersistTwiceWillDoNoHarm(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+
+ @Test
+ public void testCommit(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog).snapshotCommit();
+
+ verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+ ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+ verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+ assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+ // config snapShotBatchCount = 10
+ // therefore maxSequenceNumber = 90
+ }
+
+ @Test
+ public void testCommitBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCommitBeforeCapture(){
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCallingCommitMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ }
+
+ @Test
+ public void testRollback(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog).snapshotRollback();
+ }
+
+
+ @Test
+ public void testRollbackBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testRollbackBeforeCapture(){
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testCallingRollbackMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, times(1)).snapshotRollback();
+ }
+
+ @Test
+ public void testTrimLogWhenTrimIndexLessThanLastApplied() {
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ assertEquals("return index", 10L, retIndex);
+
+ verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testTrimLogWhenLastAppliedNotSet() {
+ doReturn(-1L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testTrimLogWhenLastAppliedZero() {
+ doReturn(0L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testTrimLogWhenTrimIndexNotPresent() {
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+
+ doReturn(false).when(mockReplicatedLog).isPresent(10);
+
+ long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ // Trim index is greater than replicatedToAllIndex so should update it.
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L);
+ }
+
+ @Test
+ public void testTrimLogAfterCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+ @Test
+ public void testTrimLogAfterCaptureToInstall(){
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+ @Test
+ public void testLastAppliedTermInformationReader() {
+
+ LastAppliedTermInformationReader reader = new LastAppliedTermInformationReader();
+
+ doReturn(4L).when(mockReplicatedLog).getSnapshotTerm();
+ doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
+
+ ReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(6L, 9L,
+ new MockRaftActorContext.MockPayload());
+
+ // No followers and valid lastLogEntry
+ reader.init(mockReplicatedLog, 1L, lastLogEntry, false);
+
+ assertEquals("getTerm", 6L, reader.getTerm());
+ assertEquals("getIndex", 9L, reader.getIndex());
+
+ // No followers and null lastLogEntry
+ reader.init(mockReplicatedLog, 1L, null, false);
+
+ assertEquals("getTerm", -1L, reader.getTerm());
+ assertEquals("getIndex", -1L, reader.getIndex());
+
+ // Followers and valid originalIndex entry
+ doReturn(new MockRaftActorContext.MockReplicatedLogEntry(5L, 8L,
+ new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L);
+ reader.init(mockReplicatedLog, 8L, lastLogEntry, true);
+
+ assertEquals("getTerm", 5L, reader.getTerm());
+ assertEquals("getIndex", 8L, reader.getIndex());
+
+ // Followers and null originalIndex entry and valid snapshot index
+ reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
+
+ assertEquals("getTerm", 4L, reader.getTerm());
+ assertEquals("getIndex", 7L, reader.getIndex());
+
+ // Followers and null originalIndex entry and invalid snapshot index
+ doReturn(-1L).when(mockReplicatedLog).getSnapshotIndex();
+ reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
+
+ assertEquals("getTerm", -1L, reader.getTerm());
+ assertEquals("getIndex", -1L, reader.getIndex());
+ }
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
assertEquals("getTerm", 1001, reply.getTerm());
}
+ @Test
+ public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ candidate = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
@Override
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
+
public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
public class LeaderTest extends AbstractLeaderTest {
static final String FOLLOWER_ID = "follower";
+ public static final String LEADER_ID = "leader";
private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
+ actorContext.getReplicatedLog().append(entry);
+
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef) {
- return createActorContext("leader", actorRef);
+ return createActorContext(LEADER_ID, actorRef);
}
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setPeerAddresses(leaderPeerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
followerActorContext.setConfigParams(configParams);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTree;
import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
import org.opendaylight.yangtools.yang.binding.BindingMapping;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
import org.opendaylight.yangtools.yang.common.QNameModule;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, SchemaContextListener, AutoCloseable {
+public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, BindingNormalizedNodeSerializer, SchemaContextListener, AutoCloseable {
private final BindingNormalizedNodeCodecRegistry codecRegistry;
private DataNormalizer legacyToNormalized;
return codecRegistry.toYangInstanceIdentifier(binding);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
- final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
- return codecRegistry.toNormalizedNode((InstanceIdentifier) bindingPath, bindingObject);
+ @Override
+ public YangInstanceIdentifier toYangInstanceIdentifier(InstanceIdentifier<?> binding) {
+ return codecRegistry.toYangInstanceIdentifier(binding);
+ }
+ @Override
+ public <T extends DataObject> Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+ InstanceIdentifier<T> path, T data) {
+ return codecRegistry.toNormalizedNode(path, data);
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding) {
- return toNormalizedNode(binding.getKey(),binding.getValue());
+ return toNormalizedNode((InstanceIdentifier) binding.getKey(),binding.getValue());
+ }
+
+ @Override
+ public Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ return codecRegistry.fromNormalizedNode(path, data);
+ }
+
+ @Override
+ public Notification fromNormalizedNodeNotification(SchemaPath path, ContainerNode data) {
+ return codecRegistry.fromNormalizedNodeNotification(path, data);
+ }
+
+ @Override
+ public DataObject fromNormalizedNodeRpcData(SchemaPath path, ContainerNode data) {
+ return codecRegistry.fromNormalizedNodeRpcData(path, data);
+ }
+
+ @Override
+ public InstanceIdentifier<?> fromYangInstanceIdentifier(YangInstanceIdentifier dom) {
+ return codecRegistry.fromYangInstanceIdentifier(dom);
+ }
+
+ @Override
+ public ContainerNode toNormalizedNodeNotification(Notification data) {
+ return codecRegistry.toNormalizedNodeNotification(data);
+ }
+
+ @Override
+ public ContainerNode toNormalizedNodeRpcData(DataContainer data) {
+ return codecRegistry.toNormalizedNodeRpcData(data);
}
/**
base config:module-type;
config:provided-service binding-dom-mapping-service;
config:provided-service sal:binding-codec-tree-factory;
+ config:provided-service sal:binding-normalized-node-serializer;
config:java-name-prefix RuntimeMapping;
}
config:java-class "org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory";
}
+ identity binding-normalized-node-serializer {
+ base "config:service-type";
+ config:java-class "org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer";
+ }
+
identity binding-notification-subscription-service {
base "config:service-type";
config:java-class "org.opendaylight.controller.sal.binding.api.NotificationService";
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+
+/**
+ * A DataPersistenceProvider implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+ private DataPersistenceProvider delegate;
+
+ public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ public void setDelegate(DataPersistenceProvider delegate) {
+ this.delegate = delegate;
+ }
+
+ public DataPersistenceProvider getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return delegate.isRecoveryApplicable();
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ delegate.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ delegate.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ delegate.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ delegate.deleteMessages(sequenceNumber);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A DataPersistenceProvider implementation with persistence disabled, essentially a no-op.
+ */
+public class NonPersistentDataProvider implements DataPersistenceProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class);
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ try {
+ procedure.apply(o);
+ } catch (Exception e) {
+ LOG.error("An unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Preconditions;
+
+/**
+ * A DataPersistenceProvider implementation with persistence enabled.
+ */
+public class PersistentDataProvider implements DataPersistenceProvider {
+
+ private final UntypedPersistentActor persistentActor;
+
+ public PersistentDataProvider(UntypedPersistentActor persistentActor) {
+ this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null");
+ }
+
+ @Override
+ public boolean isRecoveryApplicable() {
+ return true;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ persistentActor.persist(o, procedure);
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+ persistentActor.saveSnapshot(o);
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ persistentActor.deleteSnapshots(criteria);
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+ persistentActor.deleteMessages(sequenceNumber);
+ }
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.common.actor;
-import akka.japi.Procedure;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
unhandled(message);
}
-
- protected class PersistentDataProvider implements DataPersistenceProvider {
-
- public PersistentDataProvider(){
-
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return true;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- AbstractUntypedPersistentActor.this.persist(o, procedure);
- }
-
- @Override
- public void saveSnapshot(Object o) {
- AbstractUntypedPersistentActor.this.saveSnapshot(o);
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
- AbstractUntypedPersistentActor.this.deleteSnapshots(criteria);
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
- AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber);
- }
- }
-
- protected class NonPersistentDataProvider implements DataPersistenceProvider {
-
- public NonPersistentDataProvider(){
-
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return false;
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- try {
- procedure.apply(o);
- } catch (Exception e) {
- LOG.error("An unexpected error occurred", e);
- }
- }
-
- @Override
- public void saveSnapshot(Object o) {
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
-
- }
- }
}
public String getNewRole() {
return newRole;
}
+
+ @Override
+ public String toString() {
+ return "RoleChanged{" +
+ "memberId='" + memberId + '\'' +
+ ", oldRole='" + oldRole + '\'' +
+ ", newRole='" + newRole + '\'' +
+ '}';
+ }
}
# failing an operation (eg transaction create and change listener registration).
#shard-initialization-timeout-in-seconds=300
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
#shard-snapshot-batch-count=20000
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
final long startTime = System.nanoTime();
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> preCommitFuture = cohort.preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
+ } else {
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> commitFuture = cohort.commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
private DatastoreContext datastoreContext;
- private DataPersistenceProvider dataPersistenceProvider;
-
private SchemaContext schemaContext;
private int createSnapshotTransactionCounter;
* Coordinates persistence recovery on startup.
*/
private ShardRecoveryCoordinator recoveryCoordinator;
- private List<Object> currentLogRecoveryBatch;
private final DOMTransactionFactory transactionFactory;
this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent())
- ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
.getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+ setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
setTransactionCommitTimeout();
- if(datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
- dataPersistenceProvider = new PersistentDataProvider();
- } else if(!datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof PersistentDataProvider) {
- dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+ setPersistence(true);
+ } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+ setPersistence(false);
}
updateConfigParams(datastoreContext.getShardRaftConfig());
@Override
protected
void startLogRecoveryBatch(final int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
- }
+ recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
}
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
+ recoveryCoordinator.appendRecoveredLogPayload(data);
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
+ recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
}
@Override
protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }
+ recoveryCoordinator.applyCurrentLogRecoveryBatch();
}
@Override
protected void onRecoveryComplete() {
- if(recoveryCoordinator != null) {
- Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
- }
-
- for(DOMStoreWriteTransaction tx: txList) {
- try {
- syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
- }
-
recoveryCoordinator = null;
- currentLogRecoveryBatch = null;
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
}
@Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
- @Override public String persistenceId() {
+ public String persistenceId() {
return this.name;
}
- @VisibleForTesting
- DataPersistenceProvider getDataPersistenceProvider() {
- return dataPersistenceProvider;
- }
-
@VisibleForTesting
ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
}
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
- return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+ return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
}
public static Props props(
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
}
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
- if (isReady()) {
+ if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
return null;
}
- private boolean isReady() {
+ private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(!info.isShardReady()){
+ if(!info.isShardReadyWithLeaderId()){
isReady = false;
break;
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
*/
class ShardRecoveryCoordinator {
- private static final int TIME_OUT = 10;
-
- private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
- private final SchemaContext schemaContext;
+ private final InMemoryDOMDataStore store;
+ private List<ModificationPayload> currentLogRecoveryBatch;
private final String shardName;
- private final ExecutorService executor;
private final Logger log;
- private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
- String name) {
- this.schemaContext = schemaContext;
+ ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+ this.store = store;
this.shardName = shardName;
this.log = log;
- this.name = name;
-
- executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
}
- /**
- * Submits a batch of journal log entries.
- *
- * @param logEntries the serialized journal log entries
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
- }
+ void startLogRecoveryBatch(int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
- /**
- * Submits a snapshot.
- *
- * @param snapshotBytes the serialized snapshot
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
+ log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
- Collection<DOMStoreWriteTransaction> getTransactions() {
- // Shutdown the executor and wait for task completion.
- executor.shutdown();
-
+ void appendRecoveredLogPayload(Payload payload) {
try {
- if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
- return resultingTxList;
+ if(payload instanceof ModificationPayload) {
+ currentLogRecoveryBatch.add((ModificationPayload) payload);
+ } else if (payload instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationPayload) payload).getModification())));
+ } else if (payload instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationByteStringPayload) payload).getModification())));
} else {
- log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+ log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
- return Collections.emptyList();
}
- private static abstract class ShardRecoveryTask implements Runnable {
-
- final DOMStoreWriteTransaction resultingTx;
-
- ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
- this.resultingTx = resultingTx;
+ private void commitTransaction(DOMStoreWriteTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ try {
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ } catch (Exception e) {
+ log.error("{}: Failed to commit Tx on recovery", shardName, e);
}
}
- private class LogRecoveryTask extends ShardRecoveryTask {
-
- private final List<Object> logEntries;
-
- LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.logEntries = logEntries;
- }
-
- @Override
- public void run() {
- for(int i = 0; i < logEntries.size(); i++) {
- MutableCompositeModification.fromSerializable(
- logEntries.get(i)).apply(resultingTx);
- // Null out to GC quicker.
- logEntries.set(i, null);
+ /**
+ * Applies the current batched log entries to the data store.
+ */
+ void applyCurrentLogRecoveryBatch() {
+ log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ for(ModificationPayload payload: currentLogRecoveryBatch) {
+ try {
+ MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+ } catch (Exception e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
- }
- private class SnapshotRecoveryTask extends ShardRecoveryTask {
+ commitTransaction(writeTx);
+
+ currentLogRecoveryBatch = null;
+ }
- private final byte[] snapshotBytes;
+ /**
+ * Applies a recovered snapshot to the data store.
+ *
+ * @param snapshotBytes the serialized snapshot
+ */
+ void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+ log.debug("{}: Applyng recovered sbapshot", shardName);
- SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.snapshotBytes = snapshotBytes;
- }
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- @Override
- public void run() {
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
+ writeTx.write(YangInstanceIdentifier.builder().build(), node);
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
- }
+ commitTransaction(writeTx);
}
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finishCanCommit", transactionId);
}
- // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
- // their canCommit processing. If any one fails then we'll fail canCommit.
- Future<Iterable<Object>> combinedFuture =
- invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+ // For empty transactions return immediately
+ if(cohorts.size() == 0){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+ }
+ returnFuture.set(Boolean.TRUE);
+ return;
+ }
- combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+ final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if (failure != null) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
}
boolean result = true;
- for(Object response: responses) {
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- result = false;
- break;
- }
- } else {
- LOG.error("Unexpected response type {}", response.getClass());
- returnFuture.setException(new IllegalArgumentException(
- String.format("Unexpected response type %s", response.getClass())));
- return;
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
}
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type %s", response.getClass())));
+ return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+ if(iterator.hasNext() && result){
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(this, actorContext.getClientDispatcher());
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
+ returnFuture.set(Boolean.valueOf(result));
}
- returnFuture.set(Boolean.valueOf(result));
+
}
- }, actorContext.getClientDispatcher());
+ };
+
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
}
leaf shard-journal-recovery-log-batch-size {
- default 5000;
+ default 1000;
type non-zero-uint32-type;
description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery");
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
*
* @author Thomas Pantelis
*/
-public class DOMConcurrentDataCommitCoordinatorTest {
+public class ConcurrentDOMDataBrokerTest {
private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionCommitFailureWithNoShardLeader";
- String shardName = "test-1";
+ String shardName = "default";
// We don't want the shard to become the leader so prevent shard election from completing
// by setting the election timeout, which is based on the heartbeat interval, really high.
@Override
public void run() {
try {
- writeTx.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx.write(TestModel.JUNK_PATH,
+ ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
txCohort.set(writeTx.ready());
} catch(Exception e) {
}
@Test
- public void testRoleChangeNotificationReleaseReady() throws Exception {
+ public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+
+ verify(ready, times(1)).countDown();
+
+ }};
+ }
+
+ @Test
+ public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, null, RaftState.Follower.name()));
+
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+
verify(ready, times(1)).countDown();
}};
}
+
@Test
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
-import akka.japi.Procedure;
import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@Override
public Shard create() throws Exception {
+ // Use a non persistent provider because this test actually invokes persist on the journal
+ // this will cause all other messages to not be queued properly after that.
+ // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+ // it does do a persist)
return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Use MBean for verification
// Committed transaction count should increase as usual
- assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+ assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
// Commit index should advance as we do not have an empty modification
assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
- class DelegatingPersistentDataProvider implements DataPersistenceProvider {
- DataPersistenceProvider delegate;
-
- DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public boolean isRecoveryApplicable() {
- return delegate.isRecoveryApplicable();
- }
-
- @Override
- public <T> void persist(T o, Procedure<T> procedure) {
- delegate.persist(o, procedure);
+ class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+ TestPersistentDataProvider(DataPersistenceProvider delegate) {
+ super(delegate);
}
@Override
public void saveSnapshot(Object o) {
savedSnapshot.set(o);
- delegate.saveSnapshot(o);
- }
-
- @Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
- delegate.deleteSnapshots(criteria);
- }
-
- @Override
- public void deleteMessages(long sequenceNumber) {
- delegate.deleteMessages(sequenceNumber);
+ super.saveSnapshot(o);
}
}
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
- Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- DelegatingPersistentDataProvider delegating;
+ class TestShard extends Shard {
- @Override
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
+ protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ setPersistence(new TestPersistentDataProvider(super.persistence()));
+ }
- return delegating;
- }
+ @Override
+ protected void commitSnapshot(final long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
+ latch.get().countDown();
+ }
- @Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
- }
- };
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+ }
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new TestShard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT);
}
};
NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
- CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
- shard.tell(capture, getRef());
+ // Trigger creation of a snapshot by ensuring
+ RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
latch.set(new CountDownLatch(1));
savedSnapshot.set(null);
- shard.tell(capture, getRef());
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
persistentProps, "testPersistence1");
- assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
nonPersistentProps, "testPersistence2");
- assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
waitUntilLeader(shard);
shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", false,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
assertEquals("isRecoveryApplicable", true,
- shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+ shard.underlyingActor().persistence().isRecoveryApplicable());
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+ Boolean actual = future.get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ assertEquals("canCommit", false, actual);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
@Test(expected = TestException.class)
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+ public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+ "junk");
+
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).build();
public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@Override
public DeviceSources call() throws Exception {
-
- final Set<SourceIdentifier> requiredSources = Sets.newHashSet(Collections2.transform(
- remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION));
-
- // If monitoring is not supported, we will still attempt to create schema, sources might be already provided
final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
- final Set<SourceIdentifier> providedSources = Sets.newHashSet(Collections2.transform(
- availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION));
-
- final Set<SourceIdentifier> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
+ final Set<QName> requiredSources = Sets.newHashSet(remoteSessionCapabilities.getModuleBasedCaps());
+ final Set<QName> providedSources = availableSchemas.getAvailableYangSchemasQNames();
+ final Set<QName> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
if (!requiredSourcesNotProvided.isEmpty()) {
logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
id, requiredSourcesNotProvided);
logger.warn("{}: Attempting to build schema context from required sources", id);
}
-
// Here all the sources reported in netconf monitoring are merged with those reported in hello.
// It is necessary to perform this since submodules are not mentioned in hello but still required.
// This clashes with the option of a user to specify supported yang models manually in configuration for netconf-connector
// and as a result one is not able to fully override yang models of a device. It is only possible to add additional models.
- final Set<SourceIdentifier> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
+ final Set<QName> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
if (!providedSourcesNotRequired.isEmpty()) {
logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
id, providedSourcesNotRequired);
* Contains RequiredSources - sources from capabilities.
*/
private static final class DeviceSources {
- private final Collection<SourceIdentifier> requiredSources;
- private final Collection<SourceIdentifier> providedSources;
+ private final Set<QName> requiredSources;
+ private final Set<QName> providedSources;
- public DeviceSources(final Collection<SourceIdentifier> requiredSources, final Collection<SourceIdentifier> providedSources) {
+ public DeviceSources(final Set<QName> requiredSources, final Set<QName> providedSources) {
this.requiredSources = requiredSources;
this.providedSources = providedSources;
}
- public Collection<SourceIdentifier> getRequiredSources() {
+ public Set<QName> getRequiredSourcesQName() {
return requiredSources;
}
- public Collection<SourceIdentifier> getProvidedSources() {
+ public Set<QName> getProvidedSourcesQName() {
return providedSources;
}
+ public Collection<SourceIdentifier> getRequiredSources() {
+ return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
+
+ public Collection<SourceIdentifier> getProvidedSources() {
+ return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
+
}
/**
// If no more sources, fail
if(requiredSources.isEmpty()) {
- handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
+ final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
+ handleSalInitializationFailure(cause, listener);
+ salFacade.onDeviceFailed(cause);
return;
}
@Override
public void onSuccess(final SchemaContext result) {
logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
- final Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+ final Collection<QName> filteredQNames = Sets.difference(deviceSources.getProvidedSourcesQName(), capabilities.getUnresolvedCapabilites().keySet());
capabilities.addCapabilities(filteredQNames);
capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
}
private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
- final Collection<QName> qNames = new HashSet<>();
- for (final SourceIdentifier source : identifiers) {
- final Optional<QName> qname = getQNameFromSourceIdentifier(source);
- if (qname.isPresent()) {
- qNames.add(qname.get());
+ final Collection<QName> qNames = Collections2.transform(identifiers, new Function<SourceIdentifier, QName>() {
+ @Override
+ public QName apply(final SourceIdentifier sourceIdentifier) {
+ return getQNameFromSourceIdentifier(sourceIdentifier);
}
- }
+ });
+
if (qNames.isEmpty()) {
logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
}
return qNames;
}
- private Optional<QName> getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
- for (final QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
- if (qname.getLocalName().equals(identifier.getName())
- && qname.getFormattedRevision().equals(identifier.getRevision())) {
- return Optional.of(qname);
+ private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
+ // Required sources are all required and provided merged in DeviceSourcesResolver
+ for (final QName qname : deviceSources.getRequiredSourcesQName()) {
+ if(qname.getLocalName().equals(identifier.getName()) == false) {
+ continue;
+ }
+
+ if(identifier.getRevision().equals(SourceIdentifier.NOT_PRESENT_FORMATTED_REVISION) &&
+ qname.getRevision() == null) {
+ return qname;
+ }
+
+ if (qname.getFormattedRevision().equals(identifier.getRevision())) {
+ return qname;
}
}
- throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
+ throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier + " Available: " + deviceSources.getRequiredSourcesQName());
}
}
}
public final static class RemoteYangSchema {
private final QName qname;
- private RemoteYangSchema(final QName qname) {
+ RemoteYangSchema(final QName qname) {
this.qname = qname;
}
import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.io.InputStream;
import java.util.ArrayList;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.Module;
public void testNetconfDeviceMissingSource() throws Exception {
final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final NetconfDeviceCommunicator listener = getListener();
+ final SchemaContext schema = getSchema();
final SchemaContextFactory schemaFactory = getSchemaFactory();
if(((Collection<?>) invocation.getArguments()[0]).size() == 2) {
return Futures.immediateFailedCheckedFuture(schemaResolutionException);
} else {
- return Futures.immediateCheckedFuture(getSchema());
+ return Futures.immediateCheckedFuture(schema);
}
}
}).when(schemaFactory).createSchemaContext(anyCollectionOf(SourceIdentifier.class));
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
- = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
+ = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, new NetconfStateSchemas.NetconfStateSchemasResolver() {
+ @Override
+ public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
+ final Module first = Iterables.getFirst(schema.getModules(), null);
+ final QName qName = QName.create(first.getQNameModule(), first.getName());
+ final NetconfStateSchemas.RemoteYangSchema source1 = new NetconfStateSchemas.RemoteYangSchema(qName);
+ final NetconfStateSchemas.RemoteYangSchema source2 = new NetconfStateSchemas.RemoteYangSchema(QName.create(first.getQNameModule(), "test-module2"));
+ return new NetconfStateSchemas(Sets.newHashSet(source1, source2));
+ }
+ });
+
final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), true);
// Monitoring supported
final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
}
Date revision = module.getRevision();
- Preconditions.checkState(!revisionsByNamespace.containsKey(revision),
- "Duplicate revision %s for namespace %s", revision, namespace);
IdentityMapping identityMapping = revisionsByNamespace.get(revision);
if(identityMapping == null) {
package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator;
import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleIdentifierImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public Set<Module> getModules() {
- return schemaContext.getModules();
+ final Set<Module> modules = Sets.newHashSet(schemaContext.getModules());
+ for (final Module module : schemaContext.getModules()) {
+ modules.addAll(module.getSubmodules());
+ }
+ return modules;
}
@Override
public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) {
- return schemaContext.getModuleSource(moduleIdentifier).get();
+ final Optional<String> moduleSource = schemaContext.getModuleSource(moduleIdentifier);
+ if(moduleSource.isPresent()) {
+ return moduleSource.get();
+ } else {
+ try {
+ return Iterables.find(getModules(), new Predicate<Module>() {
+ @Override
+ public boolean apply(final Module input) {
+ final ModuleIdentifierImpl id = new ModuleIdentifierImpl(input.getName(), Optional.fromNullable(input.getNamespace()), Optional.fromNullable(input.getRevision()));
+ return id.equals(moduleIdentifier);
+ }
+ }).getSource();
+ } catch (final NoSuchElementException e) {
+ throw new IllegalArgumentException("Source for yang module " + moduleIdentifier + " not found", e);
+ }
+ }
}
@Override
import org.opendaylight.controller.netconf.mdsal.connector.MdsalNetconfOperationServiceFactory;
-public class NetconfMdsalMapperModule extends org.opendaylight.controller.config.yang.netconf.mdsal.mapper.AbstractNetconfMdsalMapperModule {
+public class NetconfMdsalMapperModule extends org.opendaylight.controller.config.yang.netconf.mdsal.mapper.AbstractNetconfMdsalMapperModule{
public NetconfMdsalMapperModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
@Override
public java.lang.AutoCloseable createInstance() {
- final MdsalNetconfOperationServiceFactory mdsalNetconfOperationServiceFactory = new MdsalNetconfOperationServiceFactory(getRootSchemaServiceDependency(), getDomBrokerDependency()) {
- @Override
- public void close() throws Exception {
- super.close();
- getMapperAggregatorDependency().onRemoveNetconfOperationServiceFactory(this);
- }
- };
+ final MdsalNetconfOperationServiceFactory mdsalNetconfOperationServiceFactory =
+ new MdsalNetconfOperationServiceFactory(getRootSchemaServiceDependency()) {
+ @Override
+ public void close() throws Exception {
+ super.close();
+ getMapperAggregatorDependency().onRemoveNetconfOperationServiceFactory(this);
+ }
+ };
+ getDomBrokerDependency().registerConsumer(mdsalNetconfOperationServiceFactory);
getMapperAggregatorDependency().onAddNetconfOperationServiceFactory(mdsalNetconfOperationServiceFactory);
return mdsalNetconfOperationServiceFactory;
}
import java.util.Set;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
private final OperationProvider operationProvider;
public MdsalNetconfOperationService(final CurrentSchemaContext schemaContext, final String netconfSessionIdForReporting,
- final DOMDataBroker dataBroker) {
- this.operationProvider = new OperationProvider(netconfSessionIdForReporting, schemaContext, dataBroker);
+ final DOMDataBroker dataBroker, final DOMRpcService rpcService) {
+ this.operationProvider = new OperationProvider(netconfSessionIdForReporting, schemaContext, dataBroker, rpcService);
}
@Override
package org.opendaylight.controller.netconf.mdsal.connector;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.netconf.api.Capability;
import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.capability.BasicCapability;
import org.opendaylight.controller.netconf.util.capability.YangModuleCapability;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.controller.sal.core.api.Consumer;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MdsalNetconfOperationServiceFactory implements NetconfOperationServiceFactory, AutoCloseable {
+public class MdsalNetconfOperationServiceFactory implements NetconfOperationServiceFactory, Consumer, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MdsalNetconfOperationServiceFactory.class);
- private final DOMDataBroker dataBroker;
+ private ConsumerSession session = null;
+ private DOMDataBroker dataBroker = null;
+ private DOMRpcService rpcService = null;
private final CurrentSchemaContext currentSchemaContext;
- public MdsalNetconfOperationServiceFactory(final SchemaService schemaService, final DOMDataBroker domDataBroker) {
+ public MdsalNetconfOperationServiceFactory(final SchemaService schemaService) {
this.currentSchemaContext = new CurrentSchemaContext(Preconditions.checkNotNull(schemaService));
- this.dataBroker = Preconditions.checkNotNull(domDataBroker);
}
@Override
public MdsalNetconfOperationService createService(final String netconfSessionIdForReporting) {
- return new MdsalNetconfOperationService(currentSchemaContext, netconfSessionIdForReporting, dataBroker);
+ Preconditions.checkState(dataBroker != null, "MD-SAL provider not yet initialized");
+ return new MdsalNetconfOperationService(currentSchemaContext, netconfSessionIdForReporting, dataBroker, rpcService);
}
@Override
return transformCapabilities(currentSchemaContext.getCurrentContext());
}
- static Set<Capability> transformCapabilities(final SchemaContext currentContext1) {
+ static Set<Capability> transformCapabilities(final SchemaContext currentContext) {
final Set<Capability> capabilities = new HashSet<>();
// [RFC6241] 8.3. Candidate Configuration Capability
capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
- final SchemaContext currentContext = currentContext1;
final Set<Module> modules = currentContext.getModules();
for (final Module module : modules) {
- if(currentContext.getModuleSource(module).isPresent()) {
- capabilities.add(new YangModuleCapability(module, currentContext.getModuleSource(module).get()));
- } else {
- LOG.warn("Missing source for module {}. This module will not be available from netconf server",
- module);
+ Optional<YangModuleCapability> cap = moduleToCapability(module);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
+ for (final Module submodule : module.getSubmodules()) {
+ cap = moduleToCapability(submodule);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
}
}
return capabilities;
}
+ private static Optional<YangModuleCapability> moduleToCapability(final Module module) {
+ final String source = module.getSource();
+ if(source !=null) {
+ return Optional.of(new YangModuleCapability(module, source));
+ } else {
+ LOG.warn("Missing source for module {}. This module will not be available from netconf server",
+ module);
+ }
+ return Optional.absent();
+ }
+
@Override
public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
return currentSchemaContext.registerCapabilityListener(listener);
}
+ @Override
+ public void onSessionInitiated(ConsumerSession session) {
+ this.session = Preconditions.checkNotNull(session);
+ this.dataBroker = this.session.getService(DOMDataBroker.class);
+ this.rpcService = this.session.getService(DOMRpcService.class);
+ }
+
+ @Override
+ public Collection<ConsumerFunctionality> getConsumerFunctionality() {
+ return Collections.emptySet();
+ }
}
import com.google.common.collect.Sets;
import java.util.Set;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
import org.opendaylight.controller.netconf.mdsal.connector.ops.Commit;
import org.opendaylight.controller.netconf.mdsal.connector.ops.DiscardChanges;
import org.opendaylight.controller.netconf.mdsal.connector.ops.EditConfig;
import org.opendaylight.controller.netconf.mdsal.connector.ops.Lock;
+import org.opendaylight.controller.netconf.mdsal.connector.ops.RuntimeRpc;
import org.opendaylight.controller.netconf.mdsal.connector.ops.Unlock;
import org.opendaylight.controller.netconf.mdsal.connector.ops.get.Get;
import org.opendaylight.controller.netconf.mdsal.connector.ops.get.GetConfig;
private final String netconfSessionIdForReporting;
private final CurrentSchemaContext schemaContext;
private final DOMDataBroker dataBroker;
+ private final DOMRpcService rpcService;
private final TransactionProvider transactionProvider;
- public OperationProvider(final String netconfSessionIdForReporting, final CurrentSchemaContext schemaContext, final DOMDataBroker dataBroker) {
+ public OperationProvider(final String netconfSessionIdForReporting, final CurrentSchemaContext schemaContext,
+ final DOMDataBroker dataBroker, final DOMRpcService rpcService) {
this.netconfSessionIdForReporting = netconfSessionIdForReporting;
this.schemaContext = schemaContext;
this.dataBroker = dataBroker;
- this.transactionProvider = new TransactionProvider(dataBroker, netconfSessionIdForReporting);
-
+ this.rpcService = rpcService;
+ this.transactionProvider = new TransactionProvider(this.dataBroker, netconfSessionIdForReporting);
}
Set<NetconfOperation> getOperations() {
new Get(netconfSessionIdForReporting, schemaContext, transactionProvider),
new GetConfig(netconfSessionIdForReporting, schemaContext, transactionProvider),
new Lock(netconfSessionIdForReporting),
- new Unlock(netconfSessionIdForReporting)
+ new Unlock(netconfSessionIdForReporting),
+ new RuntimeRpc(netconfSessionIdForReporting, schemaContext, rpcService)
);
}
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class Commit extends AbstractLastNetconfOperation{
+public class Commit extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class DiscardChanges extends AbstractLastNetconfOperation{
+public class DiscardChanges extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(DiscardChanges.class);
protected String getOperationName() {
return OPERATION_NAME;
}
+
}
import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider;
import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.yangtools.yang.data.api.ModifyAction;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class EditConfig extends AbstractLastNetconfOperation {
+public class EditConfig extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(EditConfig.class);
protected String getOperationName() {
return OPERATION_NAME;
}
+
}
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class Lock extends AbstractLastNetconfOperation{
+public class Lock extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(Lock.class);
protected String getOperationName() {
return OPERATION_NAME;
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.mdsal.connector.ops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorTag;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext;
+import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils;
+import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Attr;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class RuntimeRpc extends AbstractSingletonNetconfOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RuntimeRpc.class);
+
+ private final CurrentSchemaContext schemaContext;
+ private static final XMLOutputFactory XML_OUTPUT_FACTORY;
+
+ static {
+ XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory();
+ XML_OUTPUT_FACTORY.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
+ }
+
+ private final DOMRpcService rpcService;
+
+ public RuntimeRpc(final String netconfSessionIdForReporting, CurrentSchemaContext schemaContext, DOMRpcService rpcService) {
+ super(netconfSessionIdForReporting);
+ this.schemaContext = schemaContext;
+ this.rpcService = rpcService;
+ }
+
+ @Override
+ protected HandlingPriority canHandle(final String netconfOperationName, final String namespace) {
+ final URI namespaceURI = createNsUri(namespace);
+ final Optional<Module> module = getModule(namespaceURI);
+
+ if (!module.isPresent()) {
+ LOG.debug("Cannot handle rpc: {}, {}", netconfOperationName, namespace);
+ return HandlingPriority.CANNOT_HANDLE;
+ }
+
+ getRpcDefinitionFromModule(module.get(), namespaceURI, netconfOperationName);
+ return HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY;
+
+ }
+
+ @Override
+ protected String getOperationName() {
+ throw new UnsupportedOperationException("Runtime rpc does not have a stable name");
+ }
+
+ private URI createNsUri(final String namespace) {
+ final URI namespaceURI;
+ try {
+ namespaceURI = new URI(namespace);
+ } catch (URISyntaxException e) {
+ // Cannot occur, namespace in parsed XML cannot be invalid URI
+ throw new IllegalStateException("Unable to parse URI " + namespace, e);
+ }
+ return namespaceURI;
+ }
+
+ //this returns module with the newest revision if more then 1 module with same namespace is found
+ private Optional<Module> getModule(final URI namespaceURI) {
+ return Optional.of(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null));
+ }
+
+ private Optional<RpcDefinition> getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) {
+ for (RpcDefinition rpcDef : module.getRpcs()) {
+ if (rpcDef.getQName().getNamespace().equals(namespaceURI)
+ && rpcDef.getQName().getLocalName().equals(name)) {
+ return Optional.of(rpcDef);
+ }
+ }
+ return Optional.absent();
+ }
+
+ @Override
+ protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+
+ final String netconfOperationName = operationElement.getName();
+ final String netconfOperationNamespace;
+ try {
+ netconfOperationNamespace = operationElement.getNamespace();
+ } catch (MissingNameSpaceException e) {
+ LOG.debug("Cannot retrieve netconf operation namespace from message due to ", e);
+ throw new NetconfDocumentedException("Cannot retrieve netconf operation namespace from message",
+ ErrorType.protocol, ErrorTag.unknown_namespace, ErrorSeverity.error);
+ }
+
+ final URI namespaceURI = createNsUri(netconfOperationNamespace);
+ final Optional<Module> moduleOptional = getModule(namespaceURI);
+
+ if (!moduleOptional.isPresent()) {
+ throw new NetconfDocumentedException("Unable to find module in Schema Context with namespace and name : " +
+ namespaceURI + " " + netconfOperationName + schemaContext.getCurrentContext(),
+ ErrorType.application, ErrorTag.bad_element, ErrorSeverity.error);
+ }
+
+ final Optional<RpcDefinition> rpcDefinitionOptional = getRpcDefinitionFromModule(moduleOptional.get(), namespaceURI, netconfOperationName);
+
+ if (!rpcDefinitionOptional.isPresent()) {
+ throw new NetconfDocumentedException("Unable to find RpcDefinition with namespace and name : " + namespaceURI + " " + netconfOperationName,
+ ErrorType.application, ErrorTag.bad_element, ErrorSeverity.error);
+ }
+
+ final RpcDefinition rpcDefinition = rpcDefinitionOptional.get();
+ final SchemaPath schemaPath = SchemaPath.create(Collections.singletonList(rpcDefinition.getQName()), true);
+ final NormalizedNode<?, ?> inputNode = rpcToNNode(operationElement, rpcDefinition.getInput());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcFuture = rpcService.invokeRpc(schemaPath, inputNode);
+ try {
+ final DOMRpcResult result = rpcFuture.checkedGet();
+ if (result.getResult() == null) {
+ return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.of(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
+ }
+ return (Element) transformNormalizedNode(document, result.getResult(), rpcDefinition.getOutput().getPath());
+ } catch (DOMRpcException e) {
+ throw NetconfDocumentedException.wrap(e);
+ }
+ }
+
+ @Override
+ public Document handle(final Document requestMessage,
+ final NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+
+ final XmlElement requestElement = getRequestElementWithCheck(requestMessage);
+
+ final Document document = XmlUtil.newDocument();
+
+ final XmlElement operationElement = requestElement.getOnlyChildElement();
+ final Map<String, Attr> attributes = requestElement.getAttributes();
+
+ final Element response = handle(document, operationElement, subsequentOperation);
+ final Element rpcReply = XmlUtil.createElement(document, XmlNetconfConstants.RPC_REPLY_KEY, Optional.of(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
+
+ if(XmlElement.fromDomElement(response).hasNamespace()) {
+ rpcReply.appendChild(response);
+ } else {
+ final NodeList list = response.getChildNodes();
+ if (list.getLength() == 0) {
+ rpcReply.appendChild(response);
+ } else {
+ while (list.getLength() != 0) {
+ rpcReply.appendChild(list.item(0));
+ }
+ }
+ }
+
+ for (Attr attribute : attributes.values()) {
+ rpcReply.setAttributeNode((Attr) document.importNode(attribute, true));
+ }
+ document.appendChild(rpcReply);
+ return document;
+ }
+
+ //TODO move all occurences of this method in mdsal netconf(and xml factories) to a utility class
+ private Node transformNormalizedNode(final Document document, final NormalizedNode<?, ?> data, final SchemaPath rpcOutputPath) {
+ final DOMResult result = new DOMResult(document.createElement(XmlNetconfConstants.RPC_REPLY_KEY));
+
+ final XMLStreamWriter xmlWriter = getXmlStreamWriter(result);
+
+ final NormalizedNodeStreamWriter nnStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter,
+ schemaContext.getCurrentContext(), rpcOutputPath);
+
+ final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(nnStreamWriter);
+
+ writeRootElement(xmlWriter, nnWriter, (ContainerNode) data);
+ try {
+ nnStreamWriter.close();
+ xmlWriter.close();
+ } catch (IOException | XMLStreamException e) {
+ LOG.warn("Error while closing streams", e);
+ }
+
+ return result.getNode();
+ }
+
+ private XMLStreamWriter getXmlStreamWriter(final DOMResult result) {
+ try {
+ return XML_OUTPUT_FACTORY.createXMLStreamWriter(result);
+ } catch (final XMLStreamException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void writeRootElement(final XMLStreamWriter xmlWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) {
+ try {
+ for (final DataContainerChild<? extends PathArgument, ?> child : data.getValue()) {
+ nnWriter.write(child);
+ }
+ nnWriter.flush();
+ xmlWriter.flush();
+ } catch (XMLStreamException | IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Parses xml element rpc input into normalized node or null if rpc does not take any input
+ * @param oElement rpc xml element
+ * @param input input container schema node, or null if rpc does not take any input
+ * @return parsed rpc into normalized node, or null if input schema is null
+ */
+ @Nullable
+ private NormalizedNode<?, ?> rpcToNNode(final XmlElement oElement, @Nullable final ContainerSchemaNode input) {
+ return input == null ? null : DomToNormalizedNodeParserFactory
+ .getInstance(DomUtils.defaultValueCodecProvider(), schemaContext.getCurrentContext())
+ .getContainerNodeParser()
+ .parse(Collections.singletonList(oElement.getDomElement()), input);
+ }
+
+}
import com.google.common.base.Optional;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class Unlock extends AbstractLastNetconfOperation{
+public class Unlock extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(Unlock.class);
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext;
import org.opendaylight.controller.netconf.mdsal.connector.ops.Datastore;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.sal.connect.netconf.util.InstanceIdToNodes;
import org.opendaylight.yangtools.yang.common.QName;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-public abstract class AbstractGet extends AbstractLastNetconfOperation {
+public abstract class AbstractGet extends AbstractSingletonNetconfOperation {
private static final Logger LOG = LoggerFactory.getLogger(AbstractGet.class);
container dom-broker {
uses config:service-ref {
refine type {
- mandatory false;
- config:required-identity md-sal-dom:dom-async-data-broker;
+ mandatory true;
+ config:required-identity md-sal-dom:dom-broker-osgi-registry;
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.mdsal.connector.ops;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.custommonkey.xmlunit.DetailedDiff;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.custommonkey.xmlunit.examples.RecursiveElementNameAndTextQualifier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorTag;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext;
+import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
+import org.opendaylight.yangtools.yang.parser.builder.impl.BuilderUtils;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+public class RuntimeRpcTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RuntimeRpcTest.class);
+
+ private String sessionIdForReporting = "netconf-test-session1";
+
+ private static Document RPC_REPLY_OK = null;
+
+ static {
+ try {
+ RPC_REPLY_OK = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/runtimerpc-ok-reply.xml");
+ } catch (Exception e) {
+ LOG.debug("unable to load rpc reply ok.", e);
+ RPC_REPLY_OK = XmlUtil.newDocument();
+ }
+ }
+
+ private DOMRpcService rpcServiceVoidInvoke = new DOMRpcService() {
+ @Nonnull
+ @Override
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode<?, ?> input) {
+ return Futures.immediateCheckedFuture((DOMRpcResult) new DefaultDOMRpcResult(null, Collections.<RpcError>emptyList()));
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull T listener) {
+ return null;
+ }
+ };
+
+ private DOMRpcService rpcServiceFailedInvocation = new DOMRpcService() {
+ @Nonnull
+ @Override
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode<?, ?> input) {
+ return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcException("rpc invocation not implemented yet") {
+ });
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull T listener) {
+ return null;
+ }
+ };
+
+ private DOMRpcService rpcServiceSuccesfullInvocation = new DOMRpcService() {
+ @Nonnull
+ @Override
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode<?, ?> input) {
+ Collection<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> children = (Collection) input.getValue();
+ Module module = schemaContext.findModuleByNamespaceAndRevision(type.getLastComponent().getNamespace(), null);
+ RpcDefinition rpcDefinition = getRpcDefinitionFromModule(module, module.getNamespace(), type.getLastComponent().getLocalName());
+ ContainerSchemaNode outputSchemaNode = rpcDefinition.getOutput();
+ ContainerNode node = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(outputSchemaNode.getQName()))
+ .withValue(children).build();
+
+ return Futures.immediateCheckedFuture((DOMRpcResult) new DefaultDOMRpcResult(node));
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull T listener) {
+ return null;
+ }
+ };
+
+ private SchemaContext schemaContext = null;
+ private CurrentSchemaContext currentSchemaContext = null;
+ @Mock
+ private SchemaService schemaService;
+ @Mock
+ private SchemaContextListener listener;
+ @Mock
+ private ListenerRegistration registration;
+
+ @Before
+ public void setUp() throws Exception {
+
+ initMocks(this);
+ doNothing().when(registration).close();
+ doReturn(listener).when(registration).getInstance();
+ doNothing().when(schemaService).addModule(any(Module.class));
+ doNothing().when(schemaService).removeModule(any(Module.class));
+ doReturn(schemaContext).when(schemaService).getGlobalContext();
+ doReturn(schemaContext).when(schemaService).getSessionContext();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((SchemaContextListener) invocationOnMock.getArguments()[0]).onGlobalContextUpdated(schemaContext);
+ return registration;
+ }
+ }).when(schemaService).registerSchemaContextListener(any(SchemaContextListener.class));
+
+ XMLUnit.setIgnoreWhitespace(true);
+ XMLUnit.setIgnoreAttributeOrder(true);
+
+ this.schemaContext = parseSchemas(getYangSchemas());
+ this.currentSchemaContext = new CurrentSchemaContext(schemaService);
+ }
+
+ @Test
+ public void testVoidOutputRpc() throws Exception {
+ RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceVoidInvoke);
+
+ Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-void-output.xml");
+ HandlingPriority priority = rpc.canHandle(rpcDocument);
+ Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
+
+ Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+
+ verifyResponse(response, RPC_REPLY_OK);
+ }
+
+ @Test
+ public void testSuccesfullNonVoidInvocation() throws Exception {
+ RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceSuccesfullInvocation);
+
+ Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid.xml");
+ HandlingPriority priority = rpc.canHandle(rpcDocument);
+ Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
+
+ Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid-control.xml"));
+ }
+
+ @Test
+ public void testFailedInvocation() throws Exception {
+ RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceFailedInvocation);
+
+ Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid.xml");
+ HandlingPriority priority = rpc.canHandle(rpcDocument);
+ Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
+
+ try {
+ rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ fail("should have failed with rpc invocation not implemented yet");
+ } catch (NetconfDocumentedException e) {
+ assertTrue(e.getErrorType() == ErrorType.application);
+ assertTrue(e.getErrorSeverity() == ErrorSeverity.error);
+ assertTrue(e.getErrorTag() == ErrorTag.operation_failed);
+ }
+ }
+
+ @Test
+ public void testVoidInputOutputRpc() throws Exception {
+ RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceVoidInvoke);
+
+ Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-void-input-output.xml");
+ HandlingPriority priority = rpc.canHandle(rpcDocument);
+ Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
+
+ Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+
+ verifyResponse(response, RPC_REPLY_OK);
+ }
+
+ private void verifyResponse(Document response, Document template) {
+ DetailedDiff dd = new DetailedDiff(new Diff(response, template));
+ dd.overrideElementQualifier(new RecursiveElementNameAndTextQualifier());
+ assertTrue(dd.similar());
+ }
+
+ private RpcDefinition getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) {
+ for (RpcDefinition rpcDef : module.getRpcs()) {
+ if (rpcDef.getQName().getNamespace().equals(namespaceURI)
+ && rpcDef.getQName().getLocalName().equals(name)) {
+ return rpcDef;
+ }
+ }
+
+ return null;
+
+ }
+
+ private Collection<InputStream> getYangSchemas() {
+ final List<String> schemaPaths = Arrays.asList("/yang/mdsal-netconf-rpc-test.yang");
+ final List<InputStream> schemas = new ArrayList<>();
+
+ for (String schemaPath : schemaPaths) {
+ InputStream resourceAsStream = getClass().getResourceAsStream(schemaPath);
+ schemas.add(resourceAsStream);
+ }
+
+ return schemas;
+ }
+
+ private SchemaContext parseSchemas(Collection<InputStream> schemas) throws IOException, YangSyntaxErrorException {
+ final YangParserImpl parser = new YangParserImpl();
+ Collection<ByteSource> sources = BuilderUtils.streamsToByteSources(schemas);
+ return parser.parseSources(sources);
+ }
+}
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc-reply message-id="2"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <test-string xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+ test rpc input string 1
+ </test-string>
+ <test-string2 xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+ test rpc input string 2
+ </test-string2>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc message-id="2"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <nonvoid-rpc xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+ <test-string>
+ test rpc input string 1
+ </test-string>
+ <test-string2>
+ test rpc input string 2
+ </test-string2>
+ </nonvoid-rpc>
+</rpc>
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc message-id="2"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <void-input-output-rpc xmlns="urn:opendaylight:mdsal:mapping:rpc:test"/>
+</rpc>
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc message-id="2"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <void-output-rpc xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+ <test-string>
+ test rpc input string 1
+ </test-string>
+ <test-string2>
+ test rpc input string 2
+ </test-string2>
+ </void-output-rpc>
+</rpc>
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="2">
+ <ok/>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+module rpc-test {
+ yang-version 1;
+ namespace "urn:opendaylight:mdsal:mapping:rpc:test";
+ prefix "rpc";
+
+ rpc void-input-output-rpc {
+
+ }
+
+ rpc void-output-rpc {
+ input {
+ leaf test-string {
+ type string;
+ }
+
+ leaf test-string2 {
+ type string;
+ }
+ }
+ }
+
+ rpc nonvoid-rpc {
+ input {
+ leaf test-string {
+ type string;
+ }
+
+ leaf test-string2 {
+ type string;
+ }
+ }
+
+ output {
+ leaf test-string {
+ type string;
+ }
+
+ leaf test-string2 {
+ type string;
+ }
+ }
+ }
+}
+
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
private static final XPathExpression sessionIdXPath = XMLNetconfUtil
.compileXPath("/netconf:hello/netconf:session-id");
+ private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil
+ .compileXPath("/hello/session-id");
+
private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0";
protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences,
}
private long extractSessionId(final Document doc) {
- final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
- Preconditions.checkState(sessionIdNode != null, "");
- String textContent = sessionIdNode.getTextContent();
- if (textContent == null || textContent.equals("")) {
- throw new IllegalStateException("Session id not received from server");
+ String textContent = getSessionIdWithXPath(doc, sessionIdXPath);
+ if (Strings.isNullOrEmpty(textContent)) {
+ textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace);
+ if (Strings.isNullOrEmpty(textContent)) {
+ throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc));
+ }
}
return Long.valueOf(textContent);
}
+ private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) {
+ final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
+ return sessionIdNode != null ? sessionIdNode.getTextContent() : null;
+ }
+
@Override
protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel,
final NetconfHelloMessage message) throws NetconfDocumentedException {
<name>yang-schema-service</name>
</root-schema-service>
<dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:netconf:mdsal:mapper">
- <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
- <name>inmemory-data-broker</name>
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
</dom-broker>
<mapper-aggregator xmlns="urn:opendaylight:params:xml:ns:yang:controller:netconf:mdsal:mapper">
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netconf:north:mapper">prefix:netconf-mapper-registry</type>
import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
-import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class GetSchema extends AbstractLastNetconfOperation {
+public class GetSchema extends AbstractSingletonNetconfOperation {
public static final String GET_SCHEMA = "get-schema";
public static final String IDENTIFIER = "identifier";
public static final String VERSION = "version";
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
- LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
+ LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
this.connectPromise = promise;
startSsh(ctx, remoteAddress);
}
@AfterClass
public static void tearDown() throws Exception {
hashedWheelTimer.stop();
- nettyGroup.shutdownGracefully().await();
+ nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
minaTimerEx.shutdownNow();
nioExec.shutdownNow();
}
*/
package org.opendaylight.controller.netconf.util.mapping;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
public abstract class AbstractSingletonNetconfOperation extends AbstractLastNetconfOperation {
super(netconfSessionIdForReporting);
}
+ @Override
+ protected Element handle(Document document, XmlElement operationElement,
+ NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+ return handleWithNoSubsequentOperations(document, operationElement);
+ }
+
@Override
protected HandlingPriority getHandlingPriority() {
return HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
private static boolean isHelloMessage(final Document document) {
XmlElement element = XmlElement.fromDomElement(document.getDocumentElement());
try {
+ // accept even if hello has no namespace
return element.getName().equals(HELLO_TAG) &&
- element.hasNamespace() &&
- element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
} catch (MissingNameSpaceException e) {
// Cannot happen, since we check for hasNamespace
throw new IllegalStateException(e);
package org.opendaylight.controller.netconf.util.messages;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import java.util.Collection;
import java.util.List;
public static Collection<String> extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException {
XmlElement responseElement = XmlElement.fromDomDocument(doc);
- XmlElement capabilitiesElement = responseElement
- .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES);
- List<XmlElement> caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY);
+ // Extract child element <capabilities> from <hello> with or without(fallback) the same namespace
+ Optional<XmlElement> capabilitiesElement = responseElement
+ .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES)
+ .or(responseElement
+ .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES));
+
+ List<XmlElement> caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY);
return Collections2.transform(caps, new Function<XmlElement, String>() {
@Override
import com.google.common.base.Optional;
import io.netty.channel.local.LocalAddress;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final String PRIVATE_KEY_PATH_PROP = ".pk.path";
private static final String CONNECTION_TIMEOUT_MILLIS_PROP = "connectionTimeoutMillis";
- private static final long DEFAULT_TIMEOUT_MILLIS = 5000;
+ public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30);
private static final LocalAddress netconfLocalAddress = new LocalAddress("netconf");
public static LocalAddress getNetconfLocalAddress() {
assertEquals(NetconfConfigUtil.getNetconfLocalAddress(), new LocalAddress("netconf"));
doReturn("").when(bundleContext).getProperty("netconf.connectionTimeoutMillis");
- assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), 5000);
+ assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), NetconfConfigUtil.DEFAULT_TIMEOUT_MILLIS);
doReturn("a").when(bundleContext).getProperty("netconf.connectionTimeoutMillis");
- assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), 5000);
+ assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), NetconfConfigUtil.DEFAULT_TIMEOUT_MILLIS);
}
@Test