From: Moiz Raja Date: Mon, 30 Mar 2015 23:13:25 +0000 (+0000) Subject: Merge "Adjust Tx rate limiter for unused transactions" X-Git-Tag: release/lithium~313^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8eaba1eb027b02f8b36480721055dc99c6700e85;hp=08dd5c2c443ff53f56af88a0e8dc8f34e36d2245 Merge "Adjust Tx rate limiter for unused transactions" --- diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml index c5adb28db7..4afbedb76e 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml @@ -70,6 +70,13 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL ${artifactId}-impl ${symbol_dollar}{project.version} + + ${symbol_dollar}{project.groupId} + ${artifactId}-impl + ${symbol_dollar}{project.version} + xml + config + ${symbol_dollar}{project.groupId} ${artifactId}-api diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 130cb11c5a..2915a8dfd9 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -187,7 +187,7 @@ 2013.09.07.7-SNAPSHOT 1.1.0-SNAPSHOT 0.7.0-SNAPSHOT - 0.12.0 + 0.14.0 0.9.7 3.3.0 diff --git a/opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml b/opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml index 5ef6a245ec..71c4850748 100644 --- a/opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml +++ b/opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml @@ -226,6 +226,13 @@ /modules/module[type='runtime-generated-mapping'][name='runtime-mapping-singleton'] + + binding:binding-normalized-node-serializer + + runtime-mapping-singleton + /modules/module[type='runtime-generated-mapping'][name='runtime-mapping-singleton'] + + binding-impl:binding-new-notification-service diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index c5ae4c41b2..ed19f21ded 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -19,7 +19,6 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -38,7 +37,6 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa public class ExampleActor extends RaftActor { private final Map state = new HashMap(); - private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; private final Optional roleChangeNotifier; @@ -47,7 +45,7 @@ public class ExampleActor extends RaftActor { public ExampleActor(String id, Map peerAddresses, Optional configParams) { super(id, peerAddresses, configParams); - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } @@ -185,11 +183,6 @@ public class ExampleActor extends RaftActor { } - @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java new file mode 100644 index 0000000000..a22e57b32a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; +import org.slf4j.Logger; + +/** + * Implementation of ElectionTerm for the RaftActor. + */ +class ElectionTermImpl implements ElectionTerm { + /** + * Identifier of the actor whose election term information this is + */ + private long currentTerm = 0; + private String votedFor = null; + + private final DataPersistenceProvider persistence; + + private final Logger log; + private final String logId; + + ElectionTermImpl(DataPersistenceProvider persistence, String logId, Logger log) { + this.persistence = persistence; + this.logId = logId; + this.log = log; + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public String getVotedFor() { + return votedFor; + } + + @Override public void update(long currentTerm, String votedFor) { + if(log.isDebugEnabled()) { + log.debug("{}: Set currentTerm={}, votedFor={}", logId, currentTerm, votedFor); + } + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + @Override + public void updateAndPersist(long currentTerm, String votedFor){ + update(currentTerm, votedFor); + // FIXME : Maybe first persist then update the state + persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 15063cff5b..bcfd472bf6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -10,21 +10,17 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class FollowerLogInformationImpl implements FollowerLogInformation { - private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); - private static final AtomicLongFieldUpdater MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex"); - private final String id; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RaftActorContext context; - private volatile long nextIndex; + private long nextIndex; - private volatile long matchIndex; + private long matchIndex; private long lastReplicatedIndex = -1L; @@ -39,13 +35,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { } @Override - public long incrNextIndex(){ - return NEXT_INDEX_UPDATER.incrementAndGet(this); + public long incrNextIndex() { + return nextIndex++; } @Override public long decrNextIndex() { - return NEXT_INDEX_UPDATER.decrementAndGet(this); + return nextIndex--; } @Override @@ -60,7 +56,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public long incrMatchIndex(){ - return MATCH_INDEX_UPDATER.incrementAndGet(this); + return matchIndex++; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java new file mode 100644 index 0000000000..c1267fa75b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; + +/** + * An akka Procedure that does nothing. + * + * @author Thomas Pantelis + */ +public class NoopProcedure implements Procedure { + + private static final NoopProcedure INSTANCE = new NoopProcedure<>(); + + private NoopProcedure() { + } + + @SuppressWarnings("unchecked") + public static NoopProcedure instance() { + return (NoopProcedure) INSTANCE; + } + + @Override + public void apply(Object notUsed) { + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index aa72485187..a13b6ff95a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -30,6 +29,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; @@ -40,7 +42,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -98,12 +99,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - private static final Procedure APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK = - new Procedure() { - @Override - public void apply(ApplyJournalEntries param) throws Exception { - } - }; + private static final String COMMIT_SNAPSHOT = "commit_snapshot"; protected final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -119,13 +115,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ private final RaftActorContextImpl context; + private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); + + private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); + /** * The in-memory journal */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); - private CaptureSnapshot captureSnapshot = null; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; @@ -140,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional configParams) { context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(), + this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG), -1, -1, replicatedLog, peerAddresses, (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG); @@ -340,7 +338,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex()); } - persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK); + persistence().persist(applyEntries, NoopProcedure.instance()); } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); @@ -379,26 +377,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", persistenceId(), saveSnapshotFailure.cause()); - context.getReplicatedLog().snapshotRollback(); - - LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().size()); + context.getSnapshotManager().rollback(); } else if (message instanceof CaptureSnapshot) { LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - if(captureSnapshot == null) { - captureSnapshot = (CaptureSnapshot)message; - createSnapshot(); - } + context.getSnapshotManager().create(createSnapshotProcedure); - } else if (message instanceof CaptureSnapshotReply){ + } else if (message instanceof CaptureSnapshotReply) { handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); + } else if (message.equals(COMMIT_SNAPSHOT)) { + commitSnapshot(-1); } else { reusableBehaviorStateHolder.init(currentBehavior); @@ -416,7 +407,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog.dataSize()) .inMemoryJournalLogSize(replicatedLog.size()) - .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated()) + .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing()) .lastApplied(context.getLastApplied()) .lastIndex(replicatedLog.lastIndex()) .lastTerm(replicatedLog.lastTerm()) @@ -515,15 +506,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // the state to durable storage self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); - // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!context.isSnapshotCaptureInitiated()){ - raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), - raftContext.getTermInformation().getCurrentTerm()); - raftContext.getReplicatedLog().snapshotCommit(); - } else { - LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", - persistenceId(), getId()); - } + context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + } else if (clientActor != null) { // Send message for replication currentBehavior.handleMessage(getSelf(), @@ -602,6 +586,41 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setConfigParams(configParams); } + public final DataPersistenceProvider persistence() { + return delegatingPersistenceProvider.getDelegate(); + } + + public void setPersistence(DataPersistenceProvider provider) { + delegatingPersistenceProvider.setDelegate(provider); + } + + protected void setPersistence(boolean persistent) { + if(persistent) { + setPersistence(new PersistentDataProvider(this)); + } else { + setPersistence(new NonPersistentDataProvider() { + /** + * The way snapshotting works is, + *
    + *
  1. RaftActor calls createSnapshot on the Shard + *
  2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot + *
  3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save + * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the + * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This + * commitSnapshot is mimicking what is done in SaveSnapshotSuccess. + *
+ */ + @Override + public void saveSnapshot(Object o) { + // Make saving Snapshot successful + // Committing the snapshot here would end up calling commit in the creating state which would + // be a state violation. That's why now we send a message to commit the snapshot. + self().tell(COMMIT_SNAPSHOT, self()); + } + }); + } + } + /** * setPeerAddress sets the address of a known peer at a later time. *

@@ -621,10 +640,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void commitSnapshot(long sequenceNumber) { - context.getReplicatedLog().snapshotCommit(); - - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(sequenceNumber); + context.getSnapshotManager().commit(persistence(), sequenceNumber); } /** @@ -706,8 +722,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract void onStateChanged(); - protected abstract DataPersistenceProvider persistence(); - /** * Notifier Actor for this RaftActor to notify when a role change happens * @return ActorRef - ActorRef of the notifier or Optional.absent if none. @@ -716,17 +730,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void onLeaderChanged(String oldLeader, String newLeader){}; - private void trimPersistentData(long sequenceNumber) { - // Trim akka snapshots - // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied - // For now guessing that it is ANDed. - persistence().deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); - - // Trim akka journal - persistence().deleteMessages(sequenceNumber); - } - private String getLeaderAddress(){ if(isLeader()){ return getSelf().path().toString(); @@ -747,67 +750,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void handleCaptureSnapshotReply(byte[] snapshotBytes) { LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - - Snapshot sn = Snapshot.create(snapshotBytes, - context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), - captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), - captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - - persistence().saveSnapshot(sn); - - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - - long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - if (context.getReplicatedLog().dataSize() > dataThreshold) { - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", - persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, - captureSnapshot.getLastAppliedIndex()); - } - - // if memory is less, clear the log based on lastApplied. - // this could/should only happen if one of the followers is down - // as normally we keep removing from the log when its replicated to all. - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); - - // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an - // install snapshot to a follower. - if(captureSnapshot.getReplicatedToAllIndex() >= 0) { - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } - } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ - // clear the log based on replicatedToAllIndex - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), - captureSnapshot.getReplicatedToAllTerm()); - - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } else { - // The replicatedToAllIndex was not found in the log - // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. - // In this scenario we may need to save the snapshot to the akka persistence - // snapshot for recovery but we do not need to do the replicated log trimming. - context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - } - - - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + - "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - - if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { - // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot( - ByteString.copyFrom(snapshotBytes))); - } - - captureSnapshot = null; - context.setSnapshotCaptureInitiated(false); + context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory()); } protected long getTotalMemory() { @@ -819,9 +762,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { - private static final int DATA_SIZE_DIVIDER = 5; - private long dataSizeSinceLastSnapshot = 0; + private long dataSizeSinceLastSnapshot = 0L; + public ReplicatedLogImpl(Snapshot snapshot) { super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), @@ -887,9 +830,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex() + 1; - if(!hasFollowers()) { + if (!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log // due to this the journalSize will never become anything close to the // snapshot batch count. In fact will mostly be 1. @@ -903,51 +845,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // as if we were maintaining a real snapshot dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; } - + long journalSize = replicatedLogEntry.getIndex() + 1; long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - // when a snaphsot is being taken, captureSnapshot != null - if (!context.isSnapshotCaptureInitiated() && - ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || - dataSizeForCheck > dataThreshold)) { - - dataSizeSinceLastSnapshot = 0; + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," + - " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold); + if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 + || dataSizeForCheck > dataThreshold)) { - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); - if (!hasFollowers()) { - lastAppliedIndex = replicatedLogEntry.getIndex(); - lastAppliedTerm = replicatedLogEntry.getTerm(); - } else if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } + boolean started = context.getSnapshotManager().capture(replicatedLogEntry, + currentBehavior.getReplicatedToAllIndex()); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); - LOG.debug("{}: Snapshot Capture lastApplied:{} ", - persistenceId(), context.getLastApplied()); - LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), - lastAppliedIndex); - LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), - lastAppliedTerm); + if(started){ + dataSizeSinceLastSnapshot = 0; } - // send a CaptureSnapshot to self to make the expensive operation async. - long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), - null); - context.setSnapshotCaptureInitiated(true); } + if (callback != null){ callback.apply(replicatedLogEntry); } @@ -971,46 +884,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - - private class ElectionTermImpl implements ElectionTerm { - /** - * Identifier of the actor whose election term information this is - */ - private long currentTerm = 0; - private String votedFor = null; - - @Override - public long getCurrentTerm() { - return currentTerm; - } - - @Override - public String getVotedFor() { - return votedFor; - } - - @Override public void update(long currentTerm, String votedFor) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor); - } - this.currentTerm = currentTerm; - this.votedFor = votedFor; - } - - @Override - public void updateAndPersist(long currentTerm, String votedFor){ - update(currentTerm, votedFor); - // FIXME : Maybe first persist then update the state - persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ - - @Override public void apply(UpdateElectionTerm param) - throws Exception { - - } - }); - } - } - static class UpdateElectionTerm implements Serializable { private static final long serialVersionUID = 1L; private final long currentTerm; @@ -1030,28 +903,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider { - - public NonPersistentRaftDataProvider(){ - - } + private class CreateSnapshotProcedure implements Procedure { - /** - * The way snapshotting works is, - *

    - *
  1. RaftActor calls createSnapshot on the Shard - *
  2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot - *
  3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot. - * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot - * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done - * in SaveSnapshotSuccess. - *
- * @param o - */ @Override - public void saveSnapshot(Object o) { - // Make saving Snapshot successful - commitSnapshot(-1L); + public void apply(Void aVoid) throws Exception { + createSnapshot(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 9d391a1588..2e7eb5eb3a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -166,8 +166,6 @@ public interface RaftActorContext { */ ConfigParams getConfigParams(); - void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated); - - boolean isSnapshotCaptureInitiated(); + SnapshotManager getSnapshotManager(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 6fc5e4369b..eb059d60fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext { private boolean snapshotCaptureInitiated; + // Snapshot manager will need to be created on demand as it needs raft actor context which cannot + // be passed to it in the constructor + private SnapshotManager snapshotManager; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, @@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } - @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; - } - @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } @@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext { peerAddresses.put(peerId, peerAddress); } + + public SnapshotManager getSnapshotManager() { + if(snapshotManager == null){ + snapshotManager = new SnapshotManager(this, LOG); + } + return snapshotManager; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java new file mode 100644 index 0000000000..8121f75191 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -0,0 +1,431 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +public class SnapshotManager implements SnapshotState { + + + private final SnapshotState IDLE = new Idle(); + private final SnapshotState CAPTURING = new Capturing(); + private final SnapshotState PERSISTING = new Persisting(); + private final SnapshotState CREATING = new Creating(); + + private final Logger LOG; + private final RaftActorContext context; + private final LastAppliedTermInformationReader lastAppliedTermInformationReader = + new LastAppliedTermInformationReader(); + private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader = + new ReplicatedToAllTermInformationReader(); + + + private SnapshotState currentState = IDLE; + private CaptureSnapshot captureSnapshot; + + public SnapshotManager(RaftActorContext context, Logger logger) { + this.context = context; + this.LOG = logger; + } + + @Override + public boolean isCapturing() { + return currentState.isCapturing(); + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower); + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return currentState.capture(lastLogEntry, replicatedToAllIndex); + } + + @Override + public void create(Procedure callback) { + currentState.create(callback); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + currentState.commit(persistenceProvider, sequenceNumber); + } + + @Override + public void rollback() { + currentState.rollback(); + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return currentState.trimLog(desiredTrimIndex, currentBehavior); + } + + private boolean hasFollowers(){ + return context.getPeerAddresses().keySet().size() > 0; + } + + private String persistenceId(){ + return context.getId(); + } + + private class AbstractSnapshotState implements SnapshotState { + + @Override + public boolean isCapturing() { + return false; + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + LOG.debug("capture should not be called in state {}", this); + return false; + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + LOG.debug("captureToInstall should not be called in state {}", this); + return false; + } + + @Override + public void create(Procedure callback) { + LOG.debug("create should not be called in state {}", this); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + LOG.debug("persist should not be called in state {}", this); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + LOG.debug("commit should not be called in state {}", this); + } + + @Override + public void rollback() { + LOG.debug("rollback should not be called in state {}", this); + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + LOG.debug("trimLog should not be called in state {}", this); + return -1; + } + + protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){ + // we would want to keep the lastApplied as its used while capturing snapshots + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + + if(LOG.isTraceEnabled()) { + LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}", + persistenceId(), desiredTrimIndex, lastApplied, tempMin); + } + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, + context.getTermInformation().getCurrentTerm()); + + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotCommit(); + return tempMin; + } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) { + // It's possible a follower was lagging and an install snapshot advanced its match index past + // the current replicatedToAllIndex. Since the follower is now caught up we should advance the + // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely + // due to a previous snapshot triggered by the memory threshold exceeded, in that case we + // trim the log to the last applied index even if previous entries weren't replicated to all followers. + currentBehavior.setReplicatedToAllIndex(tempMin); + } + return -1; + } + } + + private class Idle extends AbstractSnapshotState { + + private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + TermInformationReader lastAppliedTermInfoReader = + lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), + lastLogEntry, hasFollowers()); + + long lastAppliedIndex = lastAppliedTermInfoReader.getIndex(); + long lastAppliedTerm = lastAppliedTermInfoReader.getTerm(); + + TermInformationReader replicatedToAllTermInfoReader = + replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex); + + long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex(); + long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); + + // send a CaptureSnapshot to self to make the expensive operation async. + captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), + lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, + newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null); + + SnapshotManager.this.currentState = CAPTURING; + + if(targetFollower != null){ + LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); + } else { + LOG.info("{}: Initiating snapshot capture {} to install on {}", + persistenceId(), captureSnapshot, targetFollower); + } + + context.getActor().tell(captureSnapshot, context.getActor()); + + return true; + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null); + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return capture(lastLogEntry, replicatedToAllIndex, targetFollower); + } + + @Override + public String toString() { + return "Idle"; + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return doTrimLog(desiredTrimIndex, currentBehavior); + } + } + + private class Capturing extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void create(Procedure callback) { + try { + callback.apply(null); + SnapshotManager.this.currentState = CREATING; + } catch (Exception e) { + LOG.error("Unexpected error occurred", e); + } + } + + @Override + public String toString() { + return "Capturing"; + } + + } + + private class Creating extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(snapshotBytes, + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + persistenceProvider.saveSnapshot(sn); + + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); + + long dataThreshold = totalMemory * + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", + persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } + + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an + // install snapshot to a follower. + if(captureSnapshot.getReplicatedToAllIndex() >= 0) { + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } + + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else { + // The replicatedToAllIndex was not found in the log + // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. + // In this scenario we may need to save the snapshot to the akka persistence + // snapshot for recovery but we do not need to do the replicated log trimming. + context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); + } + + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + if (context.getId().equals(currentBehavior.getLeaderId()) + && captureSnapshot.isInstallSnapshotInitiated()) { + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot( + ByteString.copyFrom(snapshotBytes))); + } + + captureSnapshot = null; + SnapshotManager.this.currentState = PERSISTING; + } + + @Override + public String toString() { + return "Creating"; + } + + } + + private class Persisting extends AbstractSnapshotState { + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); + + persistenceProvider.deleteMessages(sequenceNumber); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public void rollback() { + context.getReplicatedLog().snapshotRollback(); + + LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public String toString() { + return "Persisting"; + } + + } + + private static interface TermInformationReader { + long getIndex(); + long getTerm(); + } + + static class LastAppliedTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, + ReplicatedLogEntry lastLogEntry, boolean hasFollowers){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + if (!hasFollowers) { + if(lastLogEntry != null) { + index = lastLogEntry.getIndex(); + term = lastLogEntry.getTerm(); + } + } else if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } else if(log.getSnapshotIndex() > -1){ + index = log.getSnapshotIndex(); + term = log.getSnapshotTerm(); + } + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } + + private static class ReplicatedToAllTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + + if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } + + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java new file mode 100644 index 0000000000..9a9bf1c774 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + +public interface SnapshotState { + /** + * Should return true when a snapshot is being captured + * @return + */ + boolean isCapturing(); + + /** + * Initiate capture snapshot + * + * @param lastLogEntry the last entry in the replicated log + * @param replicatedToAllIndex the current replicatedToAllIndex + * + * @return true if capture was started + */ + boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + + /** + * Initiate capture snapshot for the purposing of installing that snapshot + * + * @param lastLogEntry + * @param replicatedToAllIndex + * @param targetFollower + * + * @return true if capture was started + */ + boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); + + /** + * Create the snapshot + * + * @param callback a procedure to be called which should create the snapshot + */ + void create(Procedure callback); + + /** + * Persist the snapshot + * + * @param persistenceProvider + * @param snapshotBytes + * @param currentBehavior + * @param totalMemory + */ + void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior + ,long totalMemory); + + /** + * Commit the snapshot by trimming the log + * + * @param persistenceProvider + * @param sequenceNumber + */ + void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber); + + /** + * Rollback the snapshot + */ + void rollback(); + + /** + * Trim the log + * + * @param desiredTrimIndex + * @return the actual trim index + */ + long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a63c62fa30..2c433f9007 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } @@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (wasLastChunk && !context.isSnapshotCaptureInitiated()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { @@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - - CaptureSnapshot captureSnapshot = new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId, - captureSnapshot); - } - - actor().tell(captureSnapshot, actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index a1bcf8541c..c276d32cce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -39,6 +39,8 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); + /** * Information about the RaftActor whose behavior this class represents */ @@ -254,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // message is sent to itself electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), + context.getActor(), ELECTION_TIMEOUT, context.getActorSystem().dispatcher(), context.getActor()); } @@ -460,31 +462,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param snapshotCapturedIndex */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { - // we would want to keep the lastApplied as its used while capturing snapshots - long lastApplied = context.getLastApplied(); - long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); - - if(LOG.isTraceEnabled()) { - LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}", - logName, snapshotCapturedIndex, lastApplied, tempMin); - } + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); - if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin, - context.getTermInformation().getCurrentTerm()); - - //use the term of the temp-min, since we check for isPresent, entry will not be null - ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); - context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); - context.getReplicatedLog().snapshotCommit(); - setReplicatedToAllIndex(tempMin); - } else if(tempMin > getReplicatedToAllIndex()) { - // It's possible a follower was lagging and an install snapshot advanced its match index past - // the current replicatedToAllIndex. Since the follower is now caught up we should advance the - // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely - // due to a previous snapshot triggered by the memory threshold exceeded, in that case we - // trim the log to the last applied index even if previous entries weren't replicated to all followers. - setReplicatedToAllIndex(tempMin); + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index b36c41abf2..74bede171f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -58,7 +58,14 @@ public class Candidate extends AbstractRaftActorBehavior { votesRequired = getMajorityVoteCount(peers.size()); startNewTerm(); - scheduleElection(electionDuration()); + + if(context.getPeerAddresses().isEmpty()){ + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } + + } @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index bdd459ecff..a6722e6ff9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -46,9 +46,14 @@ public class Follower extends AbstractRaftActorBehavior { public Follower(RaftActorContext context) { super(context, RaftState.Follower); - scheduleElection(electionDuration()); - initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); + + if(context.getPeerAddresses().isEmpty()){ + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } + } private boolean isLogEntryPresent(long index){ @@ -255,7 +260,7 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(reply, actor()); - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 120a3a16a9..dfaa8d55f6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -57,7 +57,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private TestRaftActor(String id, Map peerAddresses, ConfigParams config, TestActorRef collectorActor) { super(id, peerAddresses, Optional.of(config), null); - dataPersistenceProvider = new PersistentDataProvider(); this.collectorActor = collectorActor; } @@ -261,4 +260,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex()); assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData()); } + + protected String testActorPath(String id){ + return "akka://test/user" + id; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 1cc7b5f576..53cca23741 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext { private Map peerAddresses = new HashMap<>(); private ConfigParams configParams; private boolean snapshotCaptureInitiated; + private SnapshotManager snapshotManager; public MockRaftActorContext(){ electionTerm = new ElectionTerm() { @@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext { } @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; + public SnapshotManager getSnapshotManager() { + if(this.snapshotManager == null){ + this.snapshotManager = new SnapshotManager(this, getLogger()); + } + return this.snapshotManager; } public void setConfigParams(ConfigParams configParams) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 34932c7249..17a81ac3c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -53,14 +54,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -97,7 +99,6 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { - protected DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; @@ -137,9 +138,9 @@ public class RaftActorTest extends AbstractActorTest { state = new ArrayList<>(); this.delegate = mock(RaftActor.class); if(dataPersistenceProvider == null){ - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); } else { - this.dataPersistenceProvider = dataPersistenceProvider; + setPersistence(dataPersistenceProvider); } } @@ -159,6 +160,16 @@ public class RaftActorTest extends AbstractActorTest { } } + + public void waitUntilLeader(){ + for(int i = 0;i < 10; i++){ + if(isLeader()){ + break; + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + public List getState() { return state; } @@ -178,6 +189,13 @@ public class RaftActorTest extends AbstractActorTest { return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier, + DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); @@ -235,11 +253,6 @@ public class RaftActorTest extends AbstractActorTest { delegate.onStateChanged(); } - @Override - protected DataPersistenceProvider persistence() { - return this.dataPersistenceProvider; - } - @Override protected Optional getRoleChangeNotifier() { return Optional.fromNullable(roleChangeNotifier); @@ -525,7 +538,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 1, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -590,7 +603,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("remove log entries", 0, replicatedLog.size()); - mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar")); + mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar")); assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm()); assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor()); @@ -675,11 +688,13 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class)); } }; } @@ -703,9 +718,11 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); } @@ -738,10 +755,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -765,17 +784,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + ImmutableMap.of("leader", "fake/path"), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -787,7 +807,8 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); long replicatedToAllIndex = 1; - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); + + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); verify(mockRaftActor.delegate).createSnapshot(); @@ -929,7 +950,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -944,7 +967,7 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftRoleChangeNotifier() throws Exception { + public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception { new JavaTestKit(getSystem()) {{ TestActorRef notifierActor = factory.createTestActor( Props.create(MessageCollectorActor.class)); @@ -953,15 +976,17 @@ public class RaftActorTest extends AbstractActorTest { DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); long heartBeatInterval = 100; config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); - config.setElectionTimeoutFactor(1); + config.setElectionTimeoutFactor(20); String persistenceId = factory.generateActorId("notifier-"); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + Collections.emptyMap(), Optional.of(config), notifierActor, + new NonPersistentDataProvider()), persistenceId); List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); + // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); assertEquals(persistenceId, raftRoleChanged.getMemberId()); @@ -1017,6 +1042,49 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(1); + + String persistenceId = factory.generateActorId("notifier-"); + + factory.createActor(MockRaftActor.props(persistenceId, + ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); + + List matches = null; + for(int i = 0; i < 5000 / heartBeatInterval; i++) { + matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + if(matches.size() == 3) { + break; + } + Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); + } + + assertEquals(2, matches.size()); + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + }}; + } + @Test public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { new JavaTestKit(getSystem()) { @@ -1059,9 +1127,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1087,8 +1156,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1151,9 +1226,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1188,7 +1264,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1286,7 +1365,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -1297,38 +1376,6 @@ public class RaftActorTest extends AbstractActorTest { }; } - - private static class NonPersistentProvider implements DataPersistenceProvider { - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void saveSnapshot(Object o) { - - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } - @Test public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1338,7 +1385,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1370,7 +1417,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -1385,7 +1432,7 @@ public class RaftActorTest extends AbstractActorTest { config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); config.setSnapshotBatchCount(5); - DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); Map peerAddresses = new HashMap<>(); @@ -1413,7 +1460,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }}; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index 7a291f364b..bd670fd581 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -62,9 +62,11 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt // Create the leader and 2 follower actors and verify initial syncing of the followers after leader // persistence recovery. - follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams()); + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), newFollowerConfigParams()); - follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), newFollowerConfigParams()); peerAddresses = ImmutableMap.builder(). put(follower1Id, follower1Actor.path().toString()). diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index aca19c0b8b..d4a9f7701b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -56,10 +56,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId)); // Create the leader and 2 follower actors. + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), newFollowerConfigParams()); - follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams()); - - follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), newFollowerConfigParams()); Map peerAddresses = ImmutableMap.builder(). put(follower1Id, follower1Actor.path().toString()). diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java new file mode 100644 index 0000000000..5a0d5aed74 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -0,0 +1,672 @@ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import akka.testkit.TestActorRef; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.LoggerFactory; + +public class SnapshotManagerTest extends AbstractActorTest { + + @Mock + private RaftActorContext mockRaftActorContext; + + @Mock + private ConfigParams mockConfigParams; + + @Mock + private ReplicatedLog mockReplicatedLog; + + @Mock + private DataPersistenceProvider mockDataPersistenceProvider; + + @Mock + private RaftActorBehavior mockRaftActorBehavior; + + @Mock + private Procedure mockProcedure; + + private SnapshotManager snapshotManager; + + private TestActorFactory factory; + + private TestActorRef actorRef; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses(); + doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams(); + doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); + doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); + doReturn("123").when(mockRaftActorContext).getId(); + doReturn("123").when(mockRaftActorBehavior).getLeaderId(); + + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + + snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); + factory = new TestActorFactory(getSystem()); + + actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); + doReturn(actorRef).when(mockRaftActorContext).getActor(); + + } + + @After + public void tearDown(){ + factory.close(); + } + + @Test + public void testConstruction(){ + assertEquals(false, snapshotManager.isCapturing()); + } + + @Test + public void testCaptureToInstall(){ + + // Force capturing toInstall = true + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload()), 0, "follower-1"); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(0L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(0L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + actorRef.underlyingActor().clear(); + } + + @Test + public void testCapture(){ + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(9L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(9L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + + actorRef.underlyingActor().clear(); + + } + + @Test + public void testIllegalCapture() throws Exception { + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + List allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + + // This will not cause snapshot capture to start again + capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertFalse(capture); + + allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + } + + @Test + public void testPersistWhenReplicatedToAllIndexMinusOne(){ + doReturn(7L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(1L).when(mockReplicatedLog).getSnapshotTerm(); + + doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses(); + + doReturn(8L).when(mockRaftActorContext).getLastApplied(); + + MockRaftActorContext.MockReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 3L, 9L, new MockRaftActorContext.MockPayload()); + + MockRaftActorContext.MockReplicatedLogEntry lastAppliedEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 2L, 8L, new MockRaftActorContext.MockPayload()); + + doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L); + doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(lastLogEntry, -1); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); + verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); + + Snapshot snapshot = snapshotArgumentCaptor.getValue(); + + assertEquals("getLastTerm", 3L, snapshot.getLastTerm()); + assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); + assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm()); + assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex()); + assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries()); + + verify(mockReplicatedLog).snapshotPreCommit(7L, 1L); + } + + + @Test + public void testCreate() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure).apply(null); + + assertEquals("isCapturing", true, snapshotManager.isCapturing()); + } + + @Test + public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + } + + @Test + public void testCallingCreateBeforeCapture() throws Exception { + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(0)).apply(null); + } + + @Test + public void testCallingCreateAfterPersist() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + reset(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, never()).apply(null); + } + + @Test + public void testPersistWhenReplicatedToAllIndexNotMinus(){ + doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9); + doReturn(6L).when(replicatedLogEntry).getTerm(); + doReturn(9L).when(replicatedLogEntry).getIndex(); + + // when replicatedToAllIndex != -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), 9); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); + verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); + + Snapshot snapshot = snapshotArgumentCaptor.getValue(); + + assertEquals("getLastTerm", 6L, snapshot.getLastTerm()); + assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); + assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm()); + assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex()); + assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size()); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(9); + } + + + @Test + public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + } + + @Test + public void testPersistSendInstallSnapshot(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + assertTrue(capture); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + ArgumentCaptor sendInstallSnapshotArgumentCaptor + = ArgumentCaptor.forClass(SendInstallSnapshot.class); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture()); + + SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); + + assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray())); + } + + @Test + public void testCallingPersistWithoutCaptureWillDoNothing(){ + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + @Test + public void testCallingPersistTwiceWillDoNoHarm(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + + @Test + public void testCommit(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog).snapshotCommit(); + + verify(mockDataPersistenceProvider).deleteMessages(100L); + + ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); + + verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture()); + + assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100 + // config snapShotBatchCount = 10 + // therefore maxSequenceNumber = 90 + } + + @Test + public void testCommitBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(100L); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCommitBeforeCapture(){ + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong()); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCallingCommitMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, times(1)).snapshotCommit(); + + verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L); + + verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + } + + @Test + public void testRollback(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.rollback(); + + verify(mockReplicatedLog).snapshotRollback(); + } + + + @Test + public void testRollbackBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testRollbackBeforeCapture(){ + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testCallingRollbackMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.rollback(); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, times(1)).snapshotRollback(); + } + + @Test + public void testTrimLogWhenTrimIndexLessThanLastApplied() { + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + assertEquals("return index", 10L, retIndex); + + verify(mockReplicatedLog).snapshotPreCommit(10, 5); + verify(mockReplicatedLog).snapshotCommit(); + + verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); + } + + @Test + public void testTrimLogWhenLastAppliedNotSet() { + doReturn(-1L).when(mockRaftActorContext).getLastApplied(); + + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + assertEquals("return index", -1L, retIndex); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); + } + + @Test + public void testTrimLogWhenLastAppliedZero() { + doReturn(0L).when(mockRaftActorContext).getLastApplied(); + + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + assertEquals("return index", -1L, retIndex); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); + } + + @Test + public void testTrimLogWhenTrimIndexNotPresent() { + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + + doReturn(false).when(mockReplicatedLog).isPresent(10); + + long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + assertEquals("return index", -1L, retIndex); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + // Trim index is greater than replicatedToAllIndex so should update it. + verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L); + } + + @Test + public void testTrimLogAfterCapture(){ + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10, mockRaftActorBehavior); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + + @Test + public void testTrimLogAfterCaptureToInstall(){ + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9, "follower-1"); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10, mockRaftActorBehavior); + + verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + + @Test + public void testLastAppliedTermInformationReader() { + + LastAppliedTermInformationReader reader = new LastAppliedTermInformationReader(); + + doReturn(4L).when(mockReplicatedLog).getSnapshotTerm(); + doReturn(7L).when(mockReplicatedLog).getSnapshotIndex(); + + ReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(6L, 9L, + new MockRaftActorContext.MockPayload()); + + // No followers and valid lastLogEntry + reader.init(mockReplicatedLog, 1L, lastLogEntry, false); + + assertEquals("getTerm", 6L, reader.getTerm()); + assertEquals("getIndex", 9L, reader.getIndex()); + + // No followers and null lastLogEntry + reader.init(mockReplicatedLog, 1L, null, false); + + assertEquals("getTerm", -1L, reader.getTerm()); + assertEquals("getIndex", -1L, reader.getIndex()); + + // Followers and valid originalIndex entry + doReturn(new MockRaftActorContext.MockReplicatedLogEntry(5L, 8L, + new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L); + reader.init(mockReplicatedLog, 8L, lastLogEntry, true); + + assertEquals("getTerm", 5L, reader.getTerm()); + assertEquals("getIndex", 8L, reader.getIndex()); + + // Followers and null originalIndex entry and valid snapshot index + reader.init(mockReplicatedLog, 7L, lastLogEntry, true); + + assertEquals("getTerm", 4L, reader.getTerm()); + assertEquals("getIndex", 7L, reader.getIndex()); + + // Followers and null originalIndex entry and invalid snapshot index + doReturn(-1L).when(mockReplicatedLog).getSnapshotIndex(); + reader.init(mockReplicatedLog, 7L, lastLogEntry, true); + + assertEquals("getTerm", -1L, reader.getTerm()); + assertEquals("getIndex", -1L, reader.getIndex()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 60f45523cf..63fd530675 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -1,12 +1,15 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -184,6 +187,20 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { assertEquals("getTerm", 1001, reply.getTerm()); } + @Test + public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + candidate = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 75509bae51..26e4364878 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -12,11 +12,13 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -755,6 +757,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 383ebefd36..ba0bd0f29c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -48,6 +48,7 @@ import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -524,6 +525,8 @@ public class LeaderTest extends AbstractLeaderTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -946,7 +949,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + return createActorContext(LEADER_ID, actorRef); } private MockRaftActorContext createActorContextWithFollower() { @@ -1025,14 +1028,15 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -1267,6 +1271,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setConfigParams(configParams); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java index 270ae50475..d9e58e538d 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java @@ -19,17 +19,21 @@ import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizat import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTree; import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory; +import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry; import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy; import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext; import org.opendaylight.yangtools.yang.binding.BindingMapping; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.binding.RpcService; import org.opendaylight.yangtools.yang.binding.util.BindingReflections; import org.opendaylight.yangtools.yang.common.QNameModule; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.opendaylight.yangtools.yang.model.api.Module; @@ -38,7 +42,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.opendaylight.yangtools.yang.model.api.SchemaPath; -public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, SchemaContextListener, AutoCloseable { +public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, BindingNormalizedNodeSerializer, SchemaContextListener, AutoCloseable { private final BindingNormalizedNodeCodecRegistry codecRegistry; private DataNormalizer legacyToNormalized; @@ -56,16 +60,52 @@ public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, Sc return codecRegistry.toYangInstanceIdentifier(binding); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Entry> toNormalizedNode( - final InstanceIdentifier bindingPath, final DataObject bindingObject) { - return codecRegistry.toNormalizedNode((InstanceIdentifier) bindingPath, bindingObject); + @Override + public YangInstanceIdentifier toYangInstanceIdentifier(InstanceIdentifier binding) { + return codecRegistry.toYangInstanceIdentifier(binding); + } + @Override + public Entry> toNormalizedNode( + InstanceIdentifier path, T data) { + return codecRegistry.toNormalizedNode(path, data); } + @SuppressWarnings({"unchecked", "rawtypes"}) public Entry> toNormalizedNode( final Entry, DataObject> binding) { - return toNormalizedNode(binding.getKey(),binding.getValue()); + return toNormalizedNode((InstanceIdentifier) binding.getKey(),binding.getValue()); + } + + @Override + public Entry, DataObject> fromNormalizedNode(YangInstanceIdentifier path, + NormalizedNode data) { + return codecRegistry.fromNormalizedNode(path, data); + } + + @Override + public Notification fromNormalizedNodeNotification(SchemaPath path, ContainerNode data) { + return codecRegistry.fromNormalizedNodeNotification(path, data); + } + + @Override + public DataObject fromNormalizedNodeRpcData(SchemaPath path, ContainerNode data) { + return codecRegistry.fromNormalizedNodeRpcData(path, data); + } + + @Override + public InstanceIdentifier fromYangInstanceIdentifier(YangInstanceIdentifier dom) { + return codecRegistry.fromYangInstanceIdentifier(dom); + } + + @Override + public ContainerNode toNormalizedNodeNotification(Notification data) { + return codecRegistry.toNormalizedNodeNotification(data); + } + + @Override + public ContainerNode toNormalizedNodeRpcData(DataContainer data) { + return codecRegistry.toNormalizedNodeRpcData(data); } /** diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang b/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang index e15cb83385..ee130fdeeb 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang +++ b/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang @@ -70,6 +70,7 @@ module opendaylight-sal-binding-broker-impl { base config:module-type; config:provided-service binding-dom-mapping-service; config:provided-service sal:binding-codec-tree-factory; + config:provided-service sal:binding-normalized-node-serializer; config:java-name-prefix RuntimeMapping; } diff --git a/opendaylight/md-sal/sal-binding-config/src/main/yang/opendaylight-md-sal-binding.yang b/opendaylight/md-sal/sal-binding-config/src/main/yang/opendaylight-md-sal-binding.yang index 81508d1b8f..18a94dfacd 100644 --- a/opendaylight/md-sal/sal-binding-config/src/main/yang/opendaylight-md-sal-binding.yang +++ b/opendaylight/md-sal/sal-binding-config/src/main/yang/opendaylight-md-sal-binding.yang @@ -48,6 +48,11 @@ module opendaylight-md-sal-binding { config:java-class "org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory"; } + identity binding-normalized-node-serializer { + base "config:service-type"; + config:java-class "org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer"; + } + identity binding-notification-subscription-service { base "config:service-type"; config:java-class "org.opendaylight.controller.sal.binding.api.NotificationService"; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java new file mode 100644 index 0000000000..c74236bb47 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; + +/** + * A DataPersistenceProvider implementation that delegates to another implementation. + * + * @author Thomas Pantelis + */ +public class DelegatingPersistentDataProvider implements DataPersistenceProvider { + private DataPersistenceProvider delegate; + + public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + public void setDelegate(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + public DataPersistenceProvider getDelegate() { + return delegate; + } + + @Override + public boolean isRecoveryApplicable() { + return delegate.isRecoveryApplicable(); + } + + @Override + public void persist(T o, Procedure procedure) { + delegate.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + delegate.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + delegate.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + delegate.deleteMessages(sequenceNumber); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java new file mode 100644 index 0000000000..fed81177a1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A DataPersistenceProvider implementation with persistence disabled, essentially a no-op. + */ +public class NonPersistentDataProvider implements DataPersistenceProvider { + private static final Logger LOG = LoggerFactory.getLogger(NonPersistentDataProvider.class); + + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + LOG.error("An unexpected error occurred", e); + } + } + + @Override + public void saveSnapshot(Object o) { + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + } + + @Override + public void deleteMessages(long sequenceNumber) { + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java new file mode 100644 index 0000000000..f130a1f27e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Preconditions; + +/** + * A DataPersistenceProvider implementation with persistence enabled. + */ +public class PersistentDataProvider implements DataPersistenceProvider { + + private final UntypedPersistentActor persistentActor; + + public PersistentDataProvider(UntypedPersistentActor persistentActor) { + this.persistentActor = Preconditions.checkNotNull(persistentActor, "persistentActor can't be null"); + } + + @Override + public boolean isRecoveryApplicable() { + return true; + } + + @Override + public void persist(T o, Procedure procedure) { + persistentActor.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + persistentActor.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + persistentActor.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + persistentActor.deleteMessages(sequenceNumber); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java index 432c2d5615..326733f377 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java @@ -8,10 +8,7 @@ package org.opendaylight.controller.cluster.common.actor; -import akka.japi.Procedure; -import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,71 +66,4 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc } unhandled(message); } - - protected class PersistentDataProvider implements DataPersistenceProvider { - - public PersistentDataProvider(){ - - } - - @Override - public boolean isRecoveryApplicable() { - return true; - } - - @Override - public void persist(T o, Procedure procedure) { - AbstractUntypedPersistentActor.this.persist(o, procedure); - } - - @Override - public void saveSnapshot(Object o) { - AbstractUntypedPersistentActor.this.saveSnapshot(o); - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - AbstractUntypedPersistentActor.this.deleteSnapshots(criteria); - } - - @Override - public void deleteMessages(long sequenceNumber) { - AbstractUntypedPersistentActor.this.deleteMessages(sequenceNumber); - } - } - - protected class NonPersistentDataProvider implements DataPersistenceProvider { - - public NonPersistentDataProvider(){ - - } - - @Override - public boolean isRecoveryApplicable() { - return false; - } - - @Override - public void persist(T o, Procedure procedure) { - try { - procedure.apply(o); - } catch (Exception e) { - LOG.error("An unexpected error occurred", e); - } - } - - @Override - public void saveSnapshot(Object o) { - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - - } - - @Override - public void deleteMessages(long sequenceNumber) { - - } - } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java index f315bfdf7a..770d709abe 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java @@ -28,4 +28,13 @@ public class RoleChanged { public String getNewRole() { return newRole; } + + @Override + public String toString() { + return "RoleChanged{" + + "memberId='" + memberId + '\'' + + ", oldRole='" + oldRole + '\'' + + ", newRole='" + newRole + '\'' + + '}'; + } } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 7df398355e..cfbf9450aa 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -35,8 +35,8 @@ operational.persistent=false # failing an operation (eg transaction create and change listener registration). #shard-initialization-timeout-in-seconds=300 -# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. -#shard-journal-recovery-log-batch-size=5000 +# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store. +#shard-journal-recovery-log-batch-size=1000 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. #shard-snapshot-batch-count=20000 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index 886c473067..538f2981da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; @@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { final long startTime = System.nanoTime(); + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Boolean result) { @@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { new TransactionCommitFailedException( "Can Commit failed, no detailed cause available.")); } else { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the preCommit phase doPreCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor); } } } @@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture canCommitFuture = cohort.canCommit(); - Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the commit phase doCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor); } } @@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture preCommitFuture = cohort.preCommit(); - Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we're done. commitStatsTracker.addDuration(System.nanoTime() - startTime); clientSubmitFuture.set(); + } else { + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor); } } @@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture commitFuture = cohort.commit(); - Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); } private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a30b6f7516..c04256a28e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -23,14 +23,12 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; @@ -116,8 +114,6 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; - private DataPersistenceProvider dataPersistenceProvider; - private SchemaContext schemaContext; private int createSnapshotTransactionCounter; @@ -140,7 +136,6 @@ public class Shard extends RaftActor { * Coordinates persistence recovery on startup. */ private ShardRecoveryCoordinator recoveryCoordinator; - private List currentLogRecoveryBatch; private final DOMTransactionFactory transactionFactory; @@ -153,11 +148,10 @@ public class Shard extends RaftActor { this.name = name.toString(); this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; - this.dataPersistenceProvider = (datastoreContext.isPersistent()) - ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) .getDispatcherPath(Dispatchers.DispatcherType.Transaction); + setPersistence(datastoreContext.isPersistent()); LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); @@ -190,6 +184,8 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + + recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); } private void setTransactionCommitTimeout() { @@ -311,12 +307,10 @@ public class Shard extends RaftActor { setTransactionCommitTimeout(); - if(datastoreContext.isPersistent() && - dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { - dataPersistenceProvider = new PersistentDataProvider(); - } else if(!datastoreContext.isPersistent() && - dataPersistenceProvider instanceof PersistentDataProvider) { - dataPersistenceProvider = new NonPersistentRaftDataProvider(); + if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { + setPersistence(true); + } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { + setPersistence(false); } updateConfigParams(datastoreContext.getShardRaftConfig()); @@ -713,81 +707,27 @@ public class Shard extends RaftActor { @Override protected void startLogRecoveryBatch(final int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize); - } + recoveryCoordinator.startLogRecoveryBatch(maxBatchSize); } @Override protected void appendRecoveredLogEntry(final Payload data) { - if(data instanceof ModificationPayload) { - try { - currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); - } catch (ClassNotFoundException | IOException e) { - LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); - } - } else if (data instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); - } else if (data instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); - } else { - LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data); - } + recoveryCoordinator.appendRecoveredLogPayload(data); } @Override protected void applyRecoverySnapshot(final byte[] snapshotBytes) { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted recovery sbapshot", persistenceId()); - } + recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes); } @Override protected void applyCurrentLogRecoveryBatch() { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(), - currentLogRecoveryBatch.size()); - } + recoveryCoordinator.applyCurrentLogRecoveryBatch(); } @Override protected void onRecoveryComplete() { - if(recoveryCoordinator != null) { - Collection txList = recoveryCoordinator.getTransactions(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size()); - } - - for(DOMStoreWriteTransaction tx: txList) { - try { - syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - } - recoveryCoordinator = null; - currentLogRecoveryBatch = null; //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); @@ -913,19 +853,10 @@ public class Shard extends RaftActor { } @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - - @Override public String persistenceId() { + public String persistenceId() { return this.name; } - @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - @VisibleForTesting ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 55a86ceeea..cff44b13cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -41,6 +41,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -136,7 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); } public static Props props( @@ -197,6 +199,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); } @@ -241,7 +250,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - if (isReady()) { + if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", persistenceId(), type, waitTillReadyCountdownLatch.getCount()); @@ -263,10 +272,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return null; } - private boolean isReady() { + private boolean isReadyWithLeaderId() { boolean isReady = true; for (ShardInformation info : localShards.values()) { - if(!info.isShardReady()){ + if(!info.isShardReadyWithLeaderId()){ isReady = false; break; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 50528575e7..7e547d7257 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -8,19 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.Collection; -import java.util.Collections; +import java.io.IOException; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; /** @@ -34,115 +34,86 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator { - private static final int TIME_OUT = 10; - - private final List resultingTxList = Lists.newArrayList(); - private final SchemaContext schemaContext; + private final InMemoryDOMDataStore store; + private List currentLogRecoveryBatch; private final String shardName; - private final ExecutorService executor; private final Logger log; - private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log, - String name) { - this.schemaContext = schemaContext; + ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) { + this.store = store; this.shardName = shardName; this.log = log; - this.name = name; - - executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ShardRecovery-" + shardName + "-%d").build()); } - /** - * Submits a batch of journal log entries. - * - * @param logEntries the serialized journal log entries - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(List logEntries, DOMStoreWriteTransaction resultingTx) { - LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); - } + void startLogRecoveryBatch(int maxBatchSize) { + currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - /** - * Submits a snapshot. - * - * @param snapshotBytes the serialized snapshot - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); + log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); } - Collection getTransactions() { - // Shutdown the executor and wait for task completion. - executor.shutdown(); - + void appendRecoveredLogPayload(Payload payload) { try { - if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { - return resultingTxList; + if(payload instanceof ModificationPayload) { + currentLogRecoveryBatch.add((ModificationPayload) payload); + } else if (payload instanceof CompositeModificationPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()))); + } else if (payload instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()))); } else { - log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT); + log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (IOException e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } - return Collections.emptyList(); } - private static abstract class ShardRecoveryTask implements Runnable { - - final DOMStoreWriteTransaction resultingTx; - - ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) { - this.resultingTx = resultingTx; + private void commitTransaction(DOMStoreWriteTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + try { + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } catch (Exception e) { + log.error("{}: Failed to commit Tx on recovery", shardName, e); } } - private class LogRecoveryTask extends ShardRecoveryTask { - - private final List logEntries; - - LogRecoveryTask(List logEntries, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.logEntries = logEntries; - } - - @Override - public void run() { - for(int i = 0; i < logEntries.size(); i++) { - MutableCompositeModification.fromSerializable( - logEntries.get(i)).apply(resultingTx); - // Null out to GC quicker. - logEntries.set(i, null); + /** + * Applies the current batched log entries to the data store. + */ + void applyCurrentLogRecoveryBatch() { + log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); + + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); + for(ModificationPayload payload: currentLogRecoveryBatch) { + try { + MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx); + } catch (Exception e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } } - } - private class SnapshotRecoveryTask extends ShardRecoveryTask { + commitTransaction(writeTx); + + currentLogRecoveryBatch = null; + } - private final byte[] snapshotBytes; + /** + * Applies a recovered snapshot to the data store. + * + * @param snapshotBytes the serialized snapshot + */ + void applyRecoveredSnapshot(final byte[] snapshotBytes) { + log.debug("{}: Applyng recovered sbapshot", shardName); - SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.snapshotBytes = snapshotBytes; - } + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); - @Override - public void run() { - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); + writeTx.write(YangInstanceIdentifier.builder().build(), node); - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } + commitTransaction(writeTx); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c479da7312..aeb4062103 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(LOG.isDebugEnabled()) { LOG.debug("Tx {} finishCanCommit", transactionId); } - // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform - // their canCommit processing. If any one fails then we'll fail canCommit. - Future> combinedFuture = - invokeCohorts(new CanCommitTransaction(transactionId).toSerializable()); + // For empty transactions return immediately + if(cohorts.size() == 0){ + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true); + } + returnFuture.set(Boolean.TRUE); + return; + } - combinedFuture.onComplete(new OnComplete>() { + final Object message = new CanCommitTransaction(transactionId).toSerializable(); + + final Iterator iterator = cohorts.iterator(); + + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Iterable responses) throws Throwable { - if(failure != null) { - if(LOG.isDebugEnabled()) { + public void onComplete(Throwable failure, Object response) throws Throwable { + if (failure != null) { + if (LOG.isDebugEnabled()) { LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); } returnFuture.setException(failure); @@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } boolean result = true; - for(Object response: responses) { - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - result = false; - break; - } - } else { - LOG.error("Unexpected response type {}", response.getClass()); - returnFuture.setException(new IllegalArgumentException( - String.format("Unexpected response type %s", response.getClass()))); - return; + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + result = false; } + } else { + LOG.error("Unexpected response type {}", response.getClass()); + returnFuture.setException(new IllegalArgumentException( + String.format("Unexpected response type %s", response.getClass()))); + return; } - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + + if(iterator.hasNext() && result){ + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(this, actorContext.getClientDispatcher()); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + } + returnFuture.set(Boolean.valueOf(result)); } - returnFuture.set(Boolean.valueOf(result)); + } - }, actorContext.getClientDispatcher()); + }; + + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index b775cf0a99..dc83af9a75 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -124,7 +124,7 @@ module distributed-datastore-provider { } leaf shard-journal-recovery-log-batch-size { - default 5000; + default 1000; type non-zero-uint32-type; description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store."; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 378bc717f4..34f0164504 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; @@ -117,7 +118,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ }; TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), "testRecovery"); + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery"); assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java similarity index 99% rename from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java rename to opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index c760349b1e..0b166f5ac8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -47,7 +47,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh * * @author Thomas Pantelis */ -public class DOMConcurrentDataCommitCoordinatorTest { +public class ConcurrentDOMDataBrokerTest { private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class); private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index fdc7e664c2..a8384d8758 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -468,7 +468,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; - String shardName = "test-1"; + String shardName = "default"; // We don't want the shard to become the leader so prevent shard election from completing // by setting the election timeout, which is based on the heartbeat interval, really high. @@ -497,8 +497,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Override public void run() { try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); txCohort.set(writeTx.ready()); } catch(Exception e) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 95b1b78a19..b676cf225c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -639,7 +639,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationReleaseReady() throws Exception { + public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -648,11 +648,35 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + + verify(ready, times(1)).countDown(); + + }}; + } + + @Test + public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + verify(ready, times(1)).countDown(); }}; } + @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 3e0bc42397..e04c1a5d18 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -16,9 +16,7 @@ import akka.actor.Props; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; -import akka.japi.Procedure; import akka.pattern.Patterns; -import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -41,6 +39,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -69,13 +69,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -99,6 +99,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -158,8 +159,12 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { + // Use a non persistent provider because this test actually invokes persist on the journal + // this will cause all other messages to not be queued properly after that. + // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when + // it does do a persist) return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -933,7 +938,7 @@ public class ShardTest extends AbstractShardTest { // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); @@ -1385,37 +1390,15 @@ public class ShardTest extends AbstractShardTest { public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ final AtomicReference savedSnapshot = new AtomicReference<>(); - class DelegatingPersistentDataProvider implements DataPersistenceProvider { - DataPersistenceProvider delegate; - - DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { - this.delegate = delegate; - } - - @Override - public boolean isRecoveryApplicable() { - return delegate.isRecoveryApplicable(); - } - - @Override - public void persist(T o, Procedure procedure) { - delegate.persist(o, procedure); + class TestPersistentDataProvider extends DelegatingPersistentDataProvider { + TestPersistentDataProvider(DataPersistenceProvider delegate) { + super(delegate); } @Override public void saveSnapshot(Object o) { savedSnapshot.set(o); - delegate.saveSnapshot(o); - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - delegate.deleteSnapshots(criteria); - } - - @Override - public void deleteMessages(long sequenceNumber) { - delegate.deleteMessages(sequenceNumber); + super.saveSnapshot(o); } } @@ -1423,29 +1406,32 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - DelegatingPersistentDataProvider delegating; + class TestShard extends Shard { - @Override - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } + protected TestShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + super(name, peerAddresses, datastoreContext, schemaContext); + setPersistence(new TestPersistentDataProvider(super.persistence())); + } - return delegating; - } + @Override + protected void commitSnapshot(final long sequenceNumber) { + super.commitSnapshot(sequenceNumber); + latch.get().countDown(); + } - @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); - } - }; + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + } + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new TestShard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); } }; @@ -1458,8 +1444,9 @@ public class ShardTest extends AbstractShardTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); - shard.tell(capture, getRef()); + // Trigger creation of a snapshot by ensuring + RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1471,7 +1458,7 @@ public class ShardTest extends AbstractShardTest { latch.set(new CountDownLatch(1)); savedSnapshot.set(null); - shard.tell(capture, getRef()); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1540,14 +1527,14 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard1 = TestActorRef.create(getSystem(), persistentProps, "testPersistence1"); - assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable()); shard1.tell(PoisonPill.getInstance(), ActorRef.noSender()); TestActorRef shard2 = TestActorRef.create(getSystem(), nonPersistentProps, "testPersistence2"); - assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable()); shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1563,19 +1550,19 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); waitUntilLeader(shard); shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", false, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 647b6e7b54..d595adc8bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), isA(requestType), any(Timeout.class)); + + doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) + .when(actorContext).getTransactionCommitOperationTimeout(); } private void verifyCohortInvocations(int nCohorts, Class requestType) { @@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ListenableFuture future = proxy.canCommit(); - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); + Boolean actual = future.get(5, TimeUnit.SECONDS); - verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + assertEquals("canCommit", false, actual); + + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); } @Test(expected = TestException.class) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 9761ed8615..4240608036 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -21,6 +21,10 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", + "junk"); + + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); @@ -31,6 +35,7 @@ public class TestModel { private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). node(OUTER_LIST_QNAME).build(); public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index cdce946aba..db9b702fed 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -325,31 +324,24 @@ public final class NetconfDevice implements RemoteDevice requiredSources = Sets.newHashSet(Collections2.transform( - remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION)); - - // If monitoring is not supported, we will still attempt to create schema, sources might be already provided final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); - final Set providedSources = Sets.newHashSet(Collections2.transform( - availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION)); - - final Set requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources); + final Set requiredSources = Sets.newHashSet(remoteSessionCapabilities.getModuleBasedCaps()); + final Set providedSources = availableSchemas.getAvailableYangSchemasQNames(); + final Set requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources); if (!requiredSourcesNotProvided.isEmpty()) { logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}", id, requiredSourcesNotProvided); logger.warn("{}: Attempting to build schema context from required sources", id); } - // Here all the sources reported in netconf monitoring are merged with those reported in hello. // It is necessary to perform this since submodules are not mentioned in hello but still required. // This clashes with the option of a user to specify supported yang models manually in configuration for netconf-connector // and as a result one is not able to fully override yang models of a device. It is only possible to add additional models. - final Set providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); + final Set providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); if (!providedSourcesNotRequired.isEmpty()) { logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", id, providedSourcesNotRequired); @@ -366,22 +358,30 @@ public final class NetconfDevice implements RemoteDevice requiredSources; - private final Collection providedSources; + private final Set requiredSources; + private final Set providedSources; - public DeviceSources(final Collection requiredSources, final Collection providedSources) { + public DeviceSources(final Set requiredSources, final Set providedSources) { this.requiredSources = requiredSources; this.providedSources = providedSources; } - public Collection getRequiredSources() { + public Set getRequiredSourcesQName() { return requiredSources; } - public Collection getProvidedSources() { + public Set getProvidedSourcesQName() { return providedSources; } + public Collection getRequiredSources() { + return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION); + } + + public Collection getProvidedSources() { + return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION); + } + } /** @@ -414,7 +414,9 @@ public final class NetconfDevice implements RemoteDevice filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + final Collection filteredQNames = Sets.difference(deviceSources.getProvidedSourcesQName(), capabilities.getUnresolvedCapabilites().keySet()); capabilities.addCapabilities(filteredQNames); capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result)); @@ -470,27 +472,36 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifiers(final Collection identifiers) { - final Collection qNames = new HashSet<>(); - for (final SourceIdentifier source : identifiers) { - final Optional qname = getQNameFromSourceIdentifier(source); - if (qname.isPresent()) { - qNames.add(qname.get()); + final Collection qNames = Collections2.transform(identifiers, new Function() { + @Override + public QName apply(final SourceIdentifier sourceIdentifier) { + return getQNameFromSourceIdentifier(sourceIdentifier); } - } + }); + if (qNames.isEmpty()) { logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers); } return qNames; } - private Optional getQNameFromSourceIdentifier(final SourceIdentifier identifier) { - for (final QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { - if (qname.getLocalName().equals(identifier.getName()) - && qname.getFormattedRevision().equals(identifier.getRevision())) { - return Optional.of(qname); + private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) { + // Required sources are all required and provided merged in DeviceSourcesResolver + for (final QName qname : deviceSources.getRequiredSourcesQName()) { + if(qname.getLocalName().equals(identifier.getName()) == false) { + continue; + } + + if(identifier.getRevision().equals(SourceIdentifier.NOT_PRESENT_FORMATTED_REVISION) && + qname.getRevision() == null) { + return qname; + } + + if (qname.getFormattedRevision().equals(identifier.getRevision())) { + return qname; } } - throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier); + throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier + " Available: " + deviceSources.getRequiredSourcesQName()); } } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java index 942e4bbaeb..645028b13f 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java @@ -174,7 +174,7 @@ public final class NetconfStateSchemas { public final static class RemoteYangSchema { private final QName qname; - private RemoteYangSchema(final QName qname) { + RemoteYangSchema(final QName qname) { this.qname = qname; } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java index 0f643789ec..93f4df83e4 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java @@ -19,7 +19,9 @@ import static org.mockito.Mockito.verify; import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import java.io.InputStream; import java.util.ArrayList; @@ -46,6 +48,7 @@ import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPr import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.Module; @@ -132,6 +135,7 @@ public class NetconfDeviceTest { public void testNetconfDeviceMissingSource() throws Exception { final RemoteDeviceHandler facade = getFacade(); final NetconfDeviceCommunicator listener = getListener(); + final SchemaContext schema = getSchema(); final SchemaContextFactory schemaFactory = getSchemaFactory(); @@ -143,13 +147,23 @@ public class NetconfDeviceTest { if(((Collection) invocation.getArguments()[0]).size() == 2) { return Futures.immediateFailedCheckedFuture(schemaResolutionException); } else { - return Futures.immediateCheckedFuture(getSchema()); + return Futures.immediateCheckedFuture(schema); } } }).when(schemaFactory).createSchemaContext(anyCollectionOf(SourceIdentifier.class)); final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO - = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver); + = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, new NetconfStateSchemas.NetconfStateSchemasResolver() { + @Override + public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) { + final Module first = Iterables.getFirst(schema.getModules(), null); + final QName qName = QName.create(first.getQNameModule(), first.getName()); + final NetconfStateSchemas.RemoteYangSchema source1 = new NetconfStateSchemas.RemoteYangSchema(qName); + final NetconfStateSchemas.RemoteYangSchema source2 = new NetconfStateSchemas.RemoteYangSchema(QName.create(first.getQNameModule(), "test-module2")); + return new NetconfStateSchemas(Sets.newHashSet(source1, source2)); + } + }); + final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), true); // Monitoring supported final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2)); diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java index bc84734190..50676c57c1 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java @@ -243,8 +243,6 @@ public class EditConfig extends AbstractConfigNetconfOperation { } Date revision = module.getRevision(); - Preconditions.checkState(!revisionsByNamespace.containsKey(revision), - "Duplicate revision %s for namespace %s", revision, namespace); IdentityMapping identityMapping = revisionsByNamespace.get(revision); if(identityMapping == null) { diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java index 0d3370548a..283ec424ba 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java @@ -8,11 +8,16 @@ package org.opendaylight.controller.netconf.confignetconfconnector.osgi; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry; import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator; @@ -23,6 +28,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleIdentifierImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,12 +105,31 @@ final class YangStoreSnapshot implements YangStoreContext { @Override public Set getModules() { - return schemaContext.getModules(); + final Set modules = Sets.newHashSet(schemaContext.getModules()); + for (final Module module : schemaContext.getModules()) { + modules.addAll(module.getSubmodules()); + } + return modules; } @Override public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) { - return schemaContext.getModuleSource(moduleIdentifier).get(); + final Optional moduleSource = schemaContext.getModuleSource(moduleIdentifier); + if(moduleSource.isPresent()) { + return moduleSource.get(); + } else { + try { + return Iterables.find(getModules(), new Predicate() { + @Override + public boolean apply(final Module input) { + final ModuleIdentifierImpl id = new ModuleIdentifierImpl(input.getName(), Optional.fromNullable(input.getNamespace()), Optional.fromNullable(input.getRevision())); + return id.equals(moduleIdentifier); + } + }).getSource(); + } catch (final NoSuchElementException e) { + throw new IllegalArgumentException("Source for yang module " + moduleIdentifier + " not found", e); + } + } } @Override diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModule.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModule.java index bf8bdb06f3..bf41f190e6 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModule.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModule.java @@ -10,7 +10,7 @@ package org.opendaylight.controller.config.yang.netconf.mdsal.mapper; import org.opendaylight.controller.netconf.mdsal.connector.MdsalNetconfOperationServiceFactory; -public class NetconfMdsalMapperModule extends org.opendaylight.controller.config.yang.netconf.mdsal.mapper.AbstractNetconfMdsalMapperModule { +public class NetconfMdsalMapperModule extends org.opendaylight.controller.config.yang.netconf.mdsal.mapper.AbstractNetconfMdsalMapperModule{ public NetconfMdsalMapperModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } @@ -26,13 +26,15 @@ public class NetconfMdsalMapperModule extends org.opendaylight.controller.config @Override public java.lang.AutoCloseable createInstance() { - final MdsalNetconfOperationServiceFactory mdsalNetconfOperationServiceFactory = new MdsalNetconfOperationServiceFactory(getRootSchemaServiceDependency(), getDomBrokerDependency()) { - @Override - public void close() throws Exception { - super.close(); - getMapperAggregatorDependency().onRemoveNetconfOperationServiceFactory(this); - } - }; + final MdsalNetconfOperationServiceFactory mdsalNetconfOperationServiceFactory = + new MdsalNetconfOperationServiceFactory(getRootSchemaServiceDependency()) { + @Override + public void close() throws Exception { + super.close(); + getMapperAggregatorDependency().onRemoveNetconfOperationServiceFactory(this); + } + }; + getDomBrokerDependency().registerConsumer(mdsalNetconfOperationServiceFactory); getMapperAggregatorDependency().onAddNetconfOperationServiceFactory(mdsalNetconfOperationServiceFactory); return mdsalNetconfOperationServiceFactory; } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationService.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationService.java index cc22dd51aa..2f5bb098f5 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationService.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationService.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.netconf.mdsal.connector; import java.util.Set; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; @@ -18,8 +19,8 @@ public class MdsalNetconfOperationService implements NetconfOperationService { private final OperationProvider operationProvider; public MdsalNetconfOperationService(final CurrentSchemaContext schemaContext, final String netconfSessionIdForReporting, - final DOMDataBroker dataBroker) { - this.operationProvider = new OperationProvider(netconfSessionIdForReporting, schemaContext, dataBroker); + final DOMDataBroker dataBroker, final DOMRpcService rpcService) { + this.operationProvider = new OperationProvider(netconfSessionIdForReporting, schemaContext, dataBroker, rpcService); } @Override diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java index 89ce149e12..96244fdc68 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java @@ -8,36 +8,44 @@ package org.opendaylight.controller.netconf.mdsal.connector; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.api.Capability; import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; import org.opendaylight.controller.netconf.util.capability.BasicCapability; import org.opendaylight.controller.netconf.util.capability.YangModuleCapability; +import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; +import org.opendaylight.controller.sal.core.api.Consumer; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MdsalNetconfOperationServiceFactory implements NetconfOperationServiceFactory, AutoCloseable { +public class MdsalNetconfOperationServiceFactory implements NetconfOperationServiceFactory, Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MdsalNetconfOperationServiceFactory.class); - private final DOMDataBroker dataBroker; + private ConsumerSession session = null; + private DOMDataBroker dataBroker = null; + private DOMRpcService rpcService = null; private final CurrentSchemaContext currentSchemaContext; - public MdsalNetconfOperationServiceFactory(final SchemaService schemaService, final DOMDataBroker domDataBroker) { + public MdsalNetconfOperationServiceFactory(final SchemaService schemaService) { this.currentSchemaContext = new CurrentSchemaContext(Preconditions.checkNotNull(schemaService)); - this.dataBroker = Preconditions.checkNotNull(domDataBroker); } @Override public MdsalNetconfOperationService createService(final String netconfSessionIdForReporting) { - return new MdsalNetconfOperationService(currentSchemaContext, netconfSessionIdForReporting, dataBroker); + Preconditions.checkState(dataBroker != null, "MD-SAL provider not yet initialized"); + return new MdsalNetconfOperationService(currentSchemaContext, netconfSessionIdForReporting, dataBroker, rpcService); } @Override @@ -50,28 +58,53 @@ public class MdsalNetconfOperationServiceFactory implements NetconfOperationServ return transformCapabilities(currentSchemaContext.getCurrentContext()); } - static Set transformCapabilities(final SchemaContext currentContext1) { + static Set transformCapabilities(final SchemaContext currentContext) { final Set capabilities = new HashSet<>(); // [RFC6241] 8.3. Candidate Configuration Capability capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0")); - final SchemaContext currentContext = currentContext1; final Set modules = currentContext.getModules(); for (final Module module : modules) { - if(currentContext.getModuleSource(module).isPresent()) { - capabilities.add(new YangModuleCapability(module, currentContext.getModuleSource(module).get())); - } else { - LOG.warn("Missing source for module {}. This module will not be available from netconf server", - module); + Optional cap = moduleToCapability(module); + if(cap.isPresent()) { + capabilities.add(cap.get()); + } + for (final Module submodule : module.getSubmodules()) { + cap = moduleToCapability(submodule); + if(cap.isPresent()) { + capabilities.add(cap.get()); + } } } return capabilities; } + private static Optional moduleToCapability(final Module module) { + final String source = module.getSource(); + if(source !=null) { + return Optional.of(new YangModuleCapability(module, source)); + } else { + LOG.warn("Missing source for module {}. This module will not be available from netconf server", + module); + } + return Optional.absent(); + } + @Override public AutoCloseable registerCapabilityListener(final CapabilityListener listener) { return currentSchemaContext.registerCapabilityListener(listener); } + @Override + public void onSessionInitiated(ConsumerSession session) { + this.session = Preconditions.checkNotNull(session); + this.dataBroker = this.session.getService(DOMDataBroker.class); + this.rpcService = this.session.getService(DOMRpcService.class); + } + + @Override + public Collection getConsumerFunctionality() { + return Collections.emptySet(); + } } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/OperationProvider.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/OperationProvider.java index c881ae2e4e..8403dccc72 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/OperationProvider.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/OperationProvider.java @@ -11,11 +11,13 @@ package org.opendaylight.controller.netconf.mdsal.connector; import com.google.common.collect.Sets; import java.util.Set; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; import org.opendaylight.controller.netconf.mdsal.connector.ops.Commit; import org.opendaylight.controller.netconf.mdsal.connector.ops.DiscardChanges; import org.opendaylight.controller.netconf.mdsal.connector.ops.EditConfig; import org.opendaylight.controller.netconf.mdsal.connector.ops.Lock; +import org.opendaylight.controller.netconf.mdsal.connector.ops.RuntimeRpc; import org.opendaylight.controller.netconf.mdsal.connector.ops.Unlock; import org.opendaylight.controller.netconf.mdsal.connector.ops.get.Get; import org.opendaylight.controller.netconf.mdsal.connector.ops.get.GetConfig; @@ -25,14 +27,16 @@ final class OperationProvider { private final String netconfSessionIdForReporting; private final CurrentSchemaContext schemaContext; private final DOMDataBroker dataBroker; + private final DOMRpcService rpcService; private final TransactionProvider transactionProvider; - public OperationProvider(final String netconfSessionIdForReporting, final CurrentSchemaContext schemaContext, final DOMDataBroker dataBroker) { + public OperationProvider(final String netconfSessionIdForReporting, final CurrentSchemaContext schemaContext, + final DOMDataBroker dataBroker, final DOMRpcService rpcService) { this.netconfSessionIdForReporting = netconfSessionIdForReporting; this.schemaContext = schemaContext; this.dataBroker = dataBroker; - this.transactionProvider = new TransactionProvider(dataBroker, netconfSessionIdForReporting); - + this.rpcService = rpcService; + this.transactionProvider = new TransactionProvider(this.dataBroker, netconfSessionIdForReporting); } Set getOperations() { @@ -43,7 +47,8 @@ final class OperationProvider { new Get(netconfSessionIdForReporting, schemaContext, transactionProvider), new GetConfig(netconfSessionIdForReporting, schemaContext, transactionProvider), new Lock(netconfSessionIdForReporting), - new Unlock(netconfSessionIdForReporting) + new Unlock(netconfSessionIdForReporting), + new RuntimeRpc(netconfSessionIdForReporting, schemaContext, rpcService) ); } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Commit.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Commit.java index 5a980c44d3..47e8f80585 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Commit.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Commit.java @@ -12,7 +12,7 @@ import com.google.common.base.Optional; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; @@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class Commit extends AbstractLastNetconfOperation{ +public class Commit extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(Commit.class); diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/DiscardChanges.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/DiscardChanges.java index b47bb18434..ce4de18ee6 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/DiscardChanges.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/DiscardChanges.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorT import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class DiscardChanges extends AbstractLastNetconfOperation{ +public class DiscardChanges extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(DiscardChanges.class); @@ -59,4 +59,5 @@ public class DiscardChanges extends AbstractLastNetconfOperation{ protected String getOperationName() { return OPERATION_NAME; } + } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java index aebdfd9baf..fbefb5c56d 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java @@ -25,7 +25,7 @@ import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext; import org.opendaylight.controller.netconf.mdsal.connector.TransactionProvider; import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.yangtools.yang.data.api.ModifyAction; @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class EditConfig extends AbstractLastNetconfOperation { +public class EditConfig extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(EditConfig.class); @@ -229,4 +229,5 @@ public class EditConfig extends AbstractLastNetconfOperation { protected String getOperationName() { return OPERATION_NAME; } + } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Lock.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Lock.java index db912c5fc0..ef94f69f7a 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Lock.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Lock.java @@ -13,7 +13,7 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class Lock extends AbstractLastNetconfOperation{ +public class Lock extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(Lock.class); @@ -61,4 +61,5 @@ public class Lock extends AbstractLastNetconfOperation{ protected String getOperationName() { return OPERATION_NAME; } + } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java new file mode 100644 index 0000000000..ff7d30d574 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.mdsal.connector.ops; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.CheckedFuture; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nullable; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import javax.xml.transform.dom.DOMResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorTag; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; +import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext; +import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils; +import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory; +import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; +import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Attr; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class RuntimeRpc extends AbstractSingletonNetconfOperation { + + private static final Logger LOG = LoggerFactory.getLogger(RuntimeRpc.class); + + private final CurrentSchemaContext schemaContext; + private static final XMLOutputFactory XML_OUTPUT_FACTORY; + + static { + XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); + XML_OUTPUT_FACTORY.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true); + } + + private final DOMRpcService rpcService; + + public RuntimeRpc(final String netconfSessionIdForReporting, CurrentSchemaContext schemaContext, DOMRpcService rpcService) { + super(netconfSessionIdForReporting); + this.schemaContext = schemaContext; + this.rpcService = rpcService; + } + + @Override + protected HandlingPriority canHandle(final String netconfOperationName, final String namespace) { + final URI namespaceURI = createNsUri(namespace); + final Optional module = getModule(namespaceURI); + + if (!module.isPresent()) { + LOG.debug("Cannot handle rpc: {}, {}", netconfOperationName, namespace); + return HandlingPriority.CANNOT_HANDLE; + } + + getRpcDefinitionFromModule(module.get(), namespaceURI, netconfOperationName); + return HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY; + + } + + @Override + protected String getOperationName() { + throw new UnsupportedOperationException("Runtime rpc does not have a stable name"); + } + + private URI createNsUri(final String namespace) { + final URI namespaceURI; + try { + namespaceURI = new URI(namespace); + } catch (URISyntaxException e) { + // Cannot occur, namespace in parsed XML cannot be invalid URI + throw new IllegalStateException("Unable to parse URI " + namespace, e); + } + return namespaceURI; + } + + //this returns module with the newest revision if more then 1 module with same namespace is found + private Optional getModule(final URI namespaceURI) { + return Optional.of(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null)); + } + + private Optional getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) { + for (RpcDefinition rpcDef : module.getRpcs()) { + if (rpcDef.getQName().getNamespace().equals(namespaceURI) + && rpcDef.getQName().getLocalName().equals(name)) { + return Optional.of(rpcDef); + } + } + return Optional.absent(); + } + + @Override + protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException { + + final String netconfOperationName = operationElement.getName(); + final String netconfOperationNamespace; + try { + netconfOperationNamespace = operationElement.getNamespace(); + } catch (MissingNameSpaceException e) { + LOG.debug("Cannot retrieve netconf operation namespace from message due to ", e); + throw new NetconfDocumentedException("Cannot retrieve netconf operation namespace from message", + ErrorType.protocol, ErrorTag.unknown_namespace, ErrorSeverity.error); + } + + final URI namespaceURI = createNsUri(netconfOperationNamespace); + final Optional moduleOptional = getModule(namespaceURI); + + if (!moduleOptional.isPresent()) { + throw new NetconfDocumentedException("Unable to find module in Schema Context with namespace and name : " + + namespaceURI + " " + netconfOperationName + schemaContext.getCurrentContext(), + ErrorType.application, ErrorTag.bad_element, ErrorSeverity.error); + } + + final Optional rpcDefinitionOptional = getRpcDefinitionFromModule(moduleOptional.get(), namespaceURI, netconfOperationName); + + if (!rpcDefinitionOptional.isPresent()) { + throw new NetconfDocumentedException("Unable to find RpcDefinition with namespace and name : " + namespaceURI + " " + netconfOperationName, + ErrorType.application, ErrorTag.bad_element, ErrorSeverity.error); + } + + final RpcDefinition rpcDefinition = rpcDefinitionOptional.get(); + final SchemaPath schemaPath = SchemaPath.create(Collections.singletonList(rpcDefinition.getQName()), true); + final NormalizedNode inputNode = rpcToNNode(operationElement, rpcDefinition.getInput()); + + final CheckedFuture rpcFuture = rpcService.invokeRpc(schemaPath, inputNode); + try { + final DOMRpcResult result = rpcFuture.checkedGet(); + if (result.getResult() == null) { + return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.of(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0)); + } + return (Element) transformNormalizedNode(document, result.getResult(), rpcDefinition.getOutput().getPath()); + } catch (DOMRpcException e) { + throw NetconfDocumentedException.wrap(e); + } + } + + @Override + public Document handle(final Document requestMessage, + final NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException { + + final XmlElement requestElement = getRequestElementWithCheck(requestMessage); + + final Document document = XmlUtil.newDocument(); + + final XmlElement operationElement = requestElement.getOnlyChildElement(); + final Map attributes = requestElement.getAttributes(); + + final Element response = handle(document, operationElement, subsequentOperation); + final Element rpcReply = XmlUtil.createElement(document, XmlNetconfConstants.RPC_REPLY_KEY, Optional.of(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0)); + + if(XmlElement.fromDomElement(response).hasNamespace()) { + rpcReply.appendChild(response); + } else { + final NodeList list = response.getChildNodes(); + if (list.getLength() == 0) { + rpcReply.appendChild(response); + } else { + while (list.getLength() != 0) { + rpcReply.appendChild(list.item(0)); + } + } + } + + for (Attr attribute : attributes.values()) { + rpcReply.setAttributeNode((Attr) document.importNode(attribute, true)); + } + document.appendChild(rpcReply); + return document; + } + + //TODO move all occurences of this method in mdsal netconf(and xml factories) to a utility class + private Node transformNormalizedNode(final Document document, final NormalizedNode data, final SchemaPath rpcOutputPath) { + final DOMResult result = new DOMResult(document.createElement(XmlNetconfConstants.RPC_REPLY_KEY)); + + final XMLStreamWriter xmlWriter = getXmlStreamWriter(result); + + final NormalizedNodeStreamWriter nnStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter, + schemaContext.getCurrentContext(), rpcOutputPath); + + final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(nnStreamWriter); + + writeRootElement(xmlWriter, nnWriter, (ContainerNode) data); + try { + nnStreamWriter.close(); + xmlWriter.close(); + } catch (IOException | XMLStreamException e) { + LOG.warn("Error while closing streams", e); + } + + return result.getNode(); + } + + private XMLStreamWriter getXmlStreamWriter(final DOMResult result) { + try { + return XML_OUTPUT_FACTORY.createXMLStreamWriter(result); + } catch (final XMLStreamException e) { + throw new RuntimeException(e); + } + } + + private void writeRootElement(final XMLStreamWriter xmlWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) { + try { + for (final DataContainerChild child : data.getValue()) { + nnWriter.write(child); + } + nnWriter.flush(); + xmlWriter.flush(); + } catch (XMLStreamException | IOException e) { + Throwables.propagate(e); + } + } + + /** + * Parses xml element rpc input into normalized node or null if rpc does not take any input + * @param oElement rpc xml element + * @param input input container schema node, or null if rpc does not take any input + * @return parsed rpc into normalized node, or null if input schema is null + */ + @Nullable + private NormalizedNode rpcToNNode(final XmlElement oElement, @Nullable final ContainerSchemaNode input) { + return input == null ? null : DomToNormalizedNodeParserFactory + .getInstance(DomUtils.defaultValueCodecProvider(), schemaContext.getCurrentContext()) + .getContainerNodeParser() + .parse(Collections.singletonList(oElement.getDomElement()), input); + } + +} diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Unlock.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Unlock.java index 2dd26633dd..08ffe8b2a0 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Unlock.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Unlock.java @@ -11,7 +11,7 @@ package org.opendaylight.controller.netconf.mdsal.connector.ops; import com.google.common.base.Optional; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class Unlock extends AbstractLastNetconfOperation{ +public class Unlock extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(Unlock.class); diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/AbstractGet.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/AbstractGet.java index 711cb8145b..9a66ceb5bc 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/AbstractGet.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/AbstractGet.java @@ -29,7 +29,7 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorT import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext; import org.opendaylight.controller.netconf.mdsal.connector.ops.Datastore; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.sal.connect.netconf.util.InstanceIdToNodes; import org.opendaylight.yangtools.yang.common.QName; @@ -56,7 +56,7 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -public abstract class AbstractGet extends AbstractLastNetconfOperation { +public abstract class AbstractGet extends AbstractSingletonNetconfOperation { private static final Logger LOG = LoggerFactory.getLogger(AbstractGet.class); diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/yang/netconf-mdsal-mapper.yang b/opendaylight/netconf/mdsal-netconf-connector/src/main/yang/netconf-mdsal-mapper.yang index 9d9966e8f1..c787287da5 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/yang/netconf-mdsal-mapper.yang +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/yang/netconf-mdsal-mapper.yang @@ -39,8 +39,8 @@ module netconf-mdsal-mapper { container dom-broker { uses config:service-ref { refine type { - mandatory false; - config:required-identity md-sal-dom:dom-async-data-broker; + mandatory true; + config:required-identity md-sal-dom:dom-broker-osgi-registry; } } } diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java b/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java new file mode 100644 index 0000000000..32eb08c2e7 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.mdsal.connector.ops; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.custommonkey.xmlunit.DetailedDiff; +import org.custommonkey.xmlunit.Diff; +import org.custommonkey.xmlunit.XMLUnit; +import org.custommonkey.xmlunit.examples.RecursiveElementNameAndTextQualifier; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorTag; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorType; +import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; +import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext; +import org.opendaylight.controller.netconf.util.test.XmlFileLoader; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; +import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException; +import org.opendaylight.yangtools.yang.parser.builder.impl.BuilderUtils; +import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; + +public class RuntimeRpcTest { + + private static final Logger LOG = LoggerFactory.getLogger(RuntimeRpcTest.class); + + private String sessionIdForReporting = "netconf-test-session1"; + + private static Document RPC_REPLY_OK = null; + + static { + try { + RPC_REPLY_OK = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/runtimerpc-ok-reply.xml"); + } catch (Exception e) { + LOG.debug("unable to load rpc reply ok.", e); + RPC_REPLY_OK = XmlUtil.newDocument(); + } + } + + private DOMRpcService rpcServiceVoidInvoke = new DOMRpcService() { + @Nonnull + @Override + public CheckedFuture invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode input) { + return Futures.immediateCheckedFuture((DOMRpcResult) new DefaultDOMRpcResult(null, Collections.emptyList())); + } + + @Nonnull + @Override + public ListenerRegistration registerRpcListener(@Nonnull T listener) { + return null; + } + }; + + private DOMRpcService rpcServiceFailedInvocation = new DOMRpcService() { + @Nonnull + @Override + public CheckedFuture invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode input) { + return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcException("rpc invocation not implemented yet") { + }); + } + + @Nonnull + @Override + public ListenerRegistration registerRpcListener(@Nonnull T listener) { + return null; + } + }; + + private DOMRpcService rpcServiceSuccesfullInvocation = new DOMRpcService() { + @Nonnull + @Override + public CheckedFuture invokeRpc(@Nonnull SchemaPath type, @Nullable NormalizedNode input) { + Collection> children = (Collection) input.getValue(); + Module module = schemaContext.findModuleByNamespaceAndRevision(type.getLastComponent().getNamespace(), null); + RpcDefinition rpcDefinition = getRpcDefinitionFromModule(module, module.getNamespace(), type.getLastComponent().getLocalName()); + ContainerSchemaNode outputSchemaNode = rpcDefinition.getOutput(); + ContainerNode node = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(outputSchemaNode.getQName())) + .withValue(children).build(); + + return Futures.immediateCheckedFuture((DOMRpcResult) new DefaultDOMRpcResult(node)); + } + + @Nonnull + @Override + public ListenerRegistration registerRpcListener(@Nonnull T listener) { + return null; + } + }; + + private SchemaContext schemaContext = null; + private CurrentSchemaContext currentSchemaContext = null; + @Mock + private SchemaService schemaService; + @Mock + private SchemaContextListener listener; + @Mock + private ListenerRegistration registration; + + @Before + public void setUp() throws Exception { + + initMocks(this); + doNothing().when(registration).close(); + doReturn(listener).when(registration).getInstance(); + doNothing().when(schemaService).addModule(any(Module.class)); + doNothing().when(schemaService).removeModule(any(Module.class)); + doReturn(schemaContext).when(schemaService).getGlobalContext(); + doReturn(schemaContext).when(schemaService).getSessionContext(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ((SchemaContextListener) invocationOnMock.getArguments()[0]).onGlobalContextUpdated(schemaContext); + return registration; + } + }).when(schemaService).registerSchemaContextListener(any(SchemaContextListener.class)); + + XMLUnit.setIgnoreWhitespace(true); + XMLUnit.setIgnoreAttributeOrder(true); + + this.schemaContext = parseSchemas(getYangSchemas()); + this.currentSchemaContext = new CurrentSchemaContext(schemaService); + } + + @Test + public void testVoidOutputRpc() throws Exception { + RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceVoidInvoke); + + Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-void-output.xml"); + HandlingPriority priority = rpc.canHandle(rpcDocument); + Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE); + + Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + + verifyResponse(response, RPC_REPLY_OK); + } + + @Test + public void testSuccesfullNonVoidInvocation() throws Exception { + RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceSuccesfullInvocation); + + Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid.xml"); + HandlingPriority priority = rpc.canHandle(rpcDocument); + Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE); + + Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid-control.xml")); + } + + @Test + public void testFailedInvocation() throws Exception { + RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceFailedInvocation); + + Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid.xml"); + HandlingPriority priority = rpc.canHandle(rpcDocument); + Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE); + + try { + rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + fail("should have failed with rpc invocation not implemented yet"); + } catch (NetconfDocumentedException e) { + assertTrue(e.getErrorType() == ErrorType.application); + assertTrue(e.getErrorSeverity() == ErrorSeverity.error); + assertTrue(e.getErrorTag() == ErrorTag.operation_failed); + } + } + + @Test + public void testVoidInputOutputRpc() throws Exception { + RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceVoidInvoke); + + Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-void-input-output.xml"); + HandlingPriority priority = rpc.canHandle(rpcDocument); + Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE); + + Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + + verifyResponse(response, RPC_REPLY_OK); + } + + private void verifyResponse(Document response, Document template) { + DetailedDiff dd = new DetailedDiff(new Diff(response, template)); + dd.overrideElementQualifier(new RecursiveElementNameAndTextQualifier()); + assertTrue(dd.similar()); + } + + private RpcDefinition getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) { + for (RpcDefinition rpcDef : module.getRpcs()) { + if (rpcDef.getQName().getNamespace().equals(namespaceURI) + && rpcDef.getQName().getLocalName().equals(name)) { + return rpcDef; + } + } + + return null; + + } + + private Collection getYangSchemas() { + final List schemaPaths = Arrays.asList("/yang/mdsal-netconf-rpc-test.yang"); + final List schemas = new ArrayList<>(); + + for (String schemaPath : schemaPaths) { + InputStream resourceAsStream = getClass().getResourceAsStream(schemaPath); + schemas.add(resourceAsStream); + } + + return schemas; + } + + private SchemaContext parseSchemas(Collection schemas) throws IOException, YangSyntaxErrorException { + final YangParserImpl parser = new YangParserImpl(); + Collection sources = BuilderUtils.streamsToByteSources(schemas); + return parser.parseSources(sources); + } +} \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid-control.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid-control.xml new file mode 100644 index 0000000000..139885b9f3 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid-control.xml @@ -0,0 +1,17 @@ + + + + + test rpc input string 1 + + + test rpc input string 2 + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid.xml new file mode 100644 index 0000000000..b5cc5ecb40 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-nonvoid.xml @@ -0,0 +1,19 @@ + + + + + + test rpc input string 1 + + + test rpc input string 2 + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-input-output.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-input-output.xml new file mode 100644 index 0000000000..c6b09f86c9 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-input-output.xml @@ -0,0 +1,12 @@ + + + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-output.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-output.xml new file mode 100644 index 0000000000..a963865257 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-void-output.xml @@ -0,0 +1,19 @@ + + + + + + test rpc input string 1 + + + test rpc input string 2 + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/runtimerpc-ok-reply.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/runtimerpc-ok-reply.xml new file mode 100644 index 0000000000..e44046eb4e --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/runtimerpc-ok-reply.xml @@ -0,0 +1,11 @@ + + + + + \ No newline at end of file diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang new file mode 100644 index 0000000000..d493840828 --- /dev/null +++ b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang @@ -0,0 +1,44 @@ +module rpc-test { + yang-version 1; + namespace "urn:opendaylight:mdsal:mapping:rpc:test"; + prefix "rpc"; + + rpc void-input-output-rpc { + + } + + rpc void-output-rpc { + input { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + } + + rpc nonvoid-rpc { + input { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + + output { + leaf test-string { + type string; + } + + leaf test-string2 { + type string; + } + } + } +} + diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java index 06c695c25a..f4017fbe58 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -45,6 +45,9 @@ public class NetconfClientSessionNegotiator extends private static final XPathExpression sessionIdXPath = XMLNetconfUtil .compileXPath("/netconf:hello/netconf:session-id"); + private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil + .compileXPath("/hello/session-id"); + private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0"; protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences, @@ -113,16 +116,22 @@ public class NetconfClientSessionNegotiator extends } private long extractSessionId(final Document doc) { - final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE); - Preconditions.checkState(sessionIdNode != null, ""); - String textContent = sessionIdNode.getTextContent(); - if (textContent == null || textContent.equals("")) { - throw new IllegalStateException("Session id not received from server"); + String textContent = getSessionIdWithXPath(doc, sessionIdXPath); + if (Strings.isNullOrEmpty(textContent)) { + textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace); + if (Strings.isNullOrEmpty(textContent)) { + throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc)); + } } return Long.valueOf(textContent); } + private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) { + final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE); + return sessionIdNode != null ? sessionIdNode.getTextContent() : null; + } + @Override protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel, final NetconfHelloMessage message) throws NetconfDocumentedException { diff --git a/opendaylight/netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml b/opendaylight/netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml index 4ca3c99e81..1982615173 100644 --- a/opendaylight/netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml +++ b/opendaylight/netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml @@ -22,8 +22,8 @@ yang-schema-service - dom:dom-async-data-broker - inmemory-data-broker + dom:dom-broker-osgi-registry + dom-broker prefix:netconf-mapper-registry diff --git a/opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/GetSchema.java b/opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/GetSchema.java index d02cb432cb..961c9f57c2 100644 --- a/opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/GetSchema.java +++ b/opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/GetSchema.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException; -import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation; +import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; @@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -public class GetSchema extends AbstractLastNetconfOperation { +public class GetSchema extends AbstractSingletonNetconfOperation { public static final String GET_SCHEMA = "get-schema"; public static final String IDENTIFIER = "identifier"; public static final String VERSION = "version"; diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index cda940f9b7..064ae72bc7 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -180,7 +180,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { - LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise); + LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise); this.connectPromise = promise; startSsh(ctx, remoteAddress); } diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java index ab73126021..d7d8660ae4 100644 --- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java +++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java @@ -61,7 +61,7 @@ public class SSHTest { @AfterClass public static void tearDown() throws Exception { hashedWheelTimer.stop(); - nettyGroup.shutdownGracefully().await(); + nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS); minaTimerEx.shutdownNow(); nioExec.shutdownNow(); } diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperation.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperation.java index 4491e763b3..3e64e93ed7 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperation.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/mapping/AbstractSingletonNetconfOperation.java @@ -7,7 +7,12 @@ */ package org.opendaylight.controller.netconf.util.mapping; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.w3c.dom.Document; +import org.w3c.dom.Element; public abstract class AbstractSingletonNetconfOperation extends AbstractLastNetconfOperation { @@ -15,6 +20,12 @@ public abstract class AbstractSingletonNetconfOperation extends AbstractLastNetc super(netconfSessionIdForReporting); } + @Override + protected Element handle(Document document, XmlElement operationElement, + NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException { + return handleWithNoSubsequentOperations(document, operationElement); + } + @Override protected HandlingPriority getHandlingPriority() { return HandlingPriority.HANDLE_WITH_MAX_PRIORITY; diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java index 5cd17a2331..404885db7e 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java @@ -94,9 +94,9 @@ public final class NetconfHelloMessage extends NetconfMessage { private static boolean isHelloMessage(final Document document) { XmlElement element = XmlElement.fromDomElement(document.getDocumentElement()); try { + // accept even if hello has no namespace return element.getName().equals(HELLO_TAG) && - element.hasNamespace() && - element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0); + (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0)); } catch (MissingNameSpaceException e) { // Cannot happen, since we check for hasNamespace throw new IllegalStateException(e); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java index 61b23202c3..3c6b6ccab9 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.netconf.util.messages; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.collect.Collections2; import java.util.Collection; import java.util.List; @@ -59,9 +60,13 @@ public final class NetconfMessageUtil { public static Collection extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException { XmlElement responseElement = XmlElement.fromDomDocument(doc); - XmlElement capabilitiesElement = responseElement - .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES); - List caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY); + // Extract child element from with or without(fallback) the same namespace + Optional capabilitiesElement = responseElement + .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES) + .or(responseElement + .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES)); + + List caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY); return Collections2.transform(caps, new Function() { @Override diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java index 64aeebd542..6855aa73c3 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtil.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.util.osgi; import com.google.common.base.Optional; import io.netty.channel.local.LocalAddress; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public final class NetconfConfigUtil { private static final String PRIVATE_KEY_PATH_PROP = ".pk.path"; private static final String CONNECTION_TIMEOUT_MILLIS_PROP = "connectionTimeoutMillis"; - private static final long DEFAULT_TIMEOUT_MILLIS = 5000; + public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30); private static final LocalAddress netconfLocalAddress = new LocalAddress("netconf"); public static LocalAddress getNetconfLocalAddress() { diff --git a/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java b/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java index 1a701057aa..a50d7fc61f 100644 --- a/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java +++ b/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/osgi/NetconfConfigUtilTest.java @@ -35,10 +35,10 @@ public class NetconfConfigUtilTest { assertEquals(NetconfConfigUtil.getNetconfLocalAddress(), new LocalAddress("netconf")); doReturn("").when(bundleContext).getProperty("netconf.connectionTimeoutMillis"); - assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), 5000); + assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), NetconfConfigUtil.DEFAULT_TIMEOUT_MILLIS); doReturn("a").when(bundleContext).getProperty("netconf.connectionTimeoutMillis"); - assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), 5000); + assertEquals(NetconfConfigUtil.extractTimeoutMillis(bundleContext), NetconfConfigUtil.DEFAULT_TIMEOUT_MILLIS); } @Test