From: Tony Tkacik Date: Mon, 30 Mar 2015 08:25:17 +0000 (+0000) Subject: Merge "Increase default negotiation timeout for netconf server to 30s" X-Git-Tag: release/lithium~334 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;hp=b03c5a20673792f0fb8df847fbaf9c359c7cce1b Merge "Increase default negotiation timeout for netconf server to 30s" --- diff --git a/opendaylight/adsal/features/nsf/src/main/resources/features.xml b/opendaylight/adsal/features/nsf/src/main/resources/features.xml index 3985a69302..56271eb699 100644 --- a/opendaylight/adsal/features/nsf/src/main/resources/features.xml +++ b/opendaylight/adsal/features/nsf/src/main/resources/features.xml @@ -14,8 +14,8 @@ odl-adsal-all - odl-nsf-controller-managers - odl-adsal-controller-northbound + odl-nsf-managers + odl-adsal-northbound @@ -44,43 +44,6 @@ mvn:org.opendaylight.controller/topologymanager/${topologymanager.version} mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version} - - mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version} - mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version} - mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version} - - mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting} - - mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1 - mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version} - - - - odl-base-all - odl-adsal-all - mvn:org.opendaylight.controller/usermanager/${usermanager.version} - mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version} - - mvn:org.opendaylight.controller/appauth/${appauth.version} - - mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version} - mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version} - - mvn:org.opendaylight.controller/containermanager/${containermanager.version} - mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version} - - mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version} - mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version} - - mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version} - mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version} - - mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version} - mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version} - - mvn:org.opendaylight.controller/topologymanager/${topologymanager.version} - mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version} - mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version} mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version} mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version} @@ -117,26 +80,4 @@ mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version} mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version} - - - odl-base-all - odl-nsf-managers - mvn:org.ow2.asm/asm-all/${asm.version} - mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version} - mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version} - mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version} - mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version} - mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version} - mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version} - mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version} - mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version} - mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version} - mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version} - mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version} - mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version} - mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version} - diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml index c5adb28db7..4afbedb76e 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml @@ -70,6 +70,13 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL ${artifactId}-impl ${symbol_dollar}{project.version} + + ${symbol_dollar}{project.groupId} + ${artifactId}-impl + ${symbol_dollar}{project.version} + xml + config + ${symbol_dollar}{project.groupId} ${artifactId}-api diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 130cb11c5a..2915a8dfd9 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -187,7 +187,7 @@ 2013.09.07.7-SNAPSHOT 1.1.0-SNAPSHOT 0.7.0-SNAPSHOT - 0.12.0 + 0.14.0 0.9.7 3.3.0 diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 603f34bac9..076d1b2fc7 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,16 +8,11 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; - -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; @@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.regex.Pattern; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; public class EventSourceTopology implements EventAggregatorService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl private final RpcRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; private final RpcProviderRegistry rpcRegistry; - private final ExecutorService executorService; public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + this.dataBroker = dataBroker; - this.executorService = Executors.newCachedThreadPool(); this.rpcRegistry = rpcRegistry; aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); @@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + } private void putData(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { + final InstanceIdentifier path, + final T data){ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); tx.put(store, path, data, true); tx.submit(); + } private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { @@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl } private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); + + final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + + final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + tx.close(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Can not notify existing nodes {}", t); + tx.close(); + } + + }); + } @Override @@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl // FIXME: Return registration object. } - private class NotifyAllNodeExecutor implements Runnable { - - private final EventSourceTopic topic; - private final DataBroker dataBroker; - private final Pattern nodeIdPatternRegex; - - public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { - this.topic = topic; - this.dataBroker = dataBroker; - this.nodeIdPatternRegex = nodeIdPatternRegex; - } - - @Override - public void run() { - //# Code reader note: Context of Node type is NetworkTopology - final List nodes = snapshot(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } - - private List snapshot() { - try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { - - final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); - - if(data.isPresent()) { - final List nodeList = data.get().getNode(); - if(nodeList != null) { - return nodeList; - } - } - return Collections.emptyList(); - } catch (final ReadFailedException e) { - LOG.error("Unable to retrieve node list.", e); - return Collections.emptyList(); - } - } - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 15063cff5b..bcfd472bf6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -10,21 +10,17 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class FollowerLogInformationImpl implements FollowerLogInformation { - private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); - private static final AtomicLongFieldUpdater MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex"); - private final String id; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RaftActorContext context; - private volatile long nextIndex; + private long nextIndex; - private volatile long matchIndex; + private long matchIndex; private long lastReplicatedIndex = -1L; @@ -39,13 +35,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { } @Override - public long incrNextIndex(){ - return NEXT_INDEX_UPDATER.incrementAndGet(this); + public long incrNextIndex() { + return nextIndex++; } @Override public long decrNextIndex() { - return NEXT_INDEX_UPDATER.decrementAndGet(this); + return nextIndex--; } @Override @@ -60,7 +56,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public long incrMatchIndex(){ - return MATCH_INDEX_UPDATER.incrementAndGet(this); + return matchIndex++; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 9faffb9395..b74259d485 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -17,11 +17,11 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -38,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -102,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { public void apply(ApplyJournalEntries param) throws Exception { } }; + private static final String COMMIT_SNAPSHOT = "commit_snapshot"; protected final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -117,17 +118,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ private final RaftActorContextImpl context; + private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); + /** * The in-memory journal */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); - private CaptureSnapshot captureSnapshot = null; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; + private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); + public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -306,9 +309,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); currentBehavior = newBehavior; - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); } @Override public void handleCommand(Object message) { @@ -375,31 +378,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", persistenceId(), saveSnapshotFailure.cause()); - context.getReplicatedLog().snapshotRollback(); - - LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().size()); + context.getSnapshotManager().rollback(); } else if (message instanceof CaptureSnapshot) { LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - if(captureSnapshot == null) { - captureSnapshot = (CaptureSnapshot)message; - createSnapshot(); - } + context.getSnapshotManager().create(createSnapshotProcedure); - } else if (message instanceof CaptureSnapshotReply){ + } else if (message instanceof CaptureSnapshotReply) { handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); + } else if (message.equals(COMMIT_SNAPSHOT)) { + commitSnapshot(-1); } else { - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); + currentBehavior = currentBehavior.handleMessage(getSender(), message); - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); } } @@ -411,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog.dataSize()) .inMemoryJournalLogSize(replicatedLog.size()) - .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated()) + .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing()) .lastApplied(context.getLastApplied()) .lastIndex(replicatedLog.lastIndex()) .lastTerm(replicatedLog.lastTerm()) @@ -446,22 +443,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) { + RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); + if (oldBehavior != currentBehavior){ onStateChanged(); } - String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId(); - String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name(); + String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId(); + String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. - onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + Optional roleChangeNotifier = getRoleChangeNotifier(); + if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { + if(roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + } + + onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + } - if (getRoleChangeNotifier().isPresent() && + if (roleChangeNotifier.isPresent() && (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { - getRoleChangeNotifier().get().tell( - new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()), - getSelf()); + roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + currentBehavior.state().name()), getSelf()); } } @@ -502,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // the state to durable storage self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); - // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!context.isSnapshotCaptureInitiated()){ - raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), - raftContext.getTermInformation().getCurrentTerm()); - raftContext.getReplicatedLog().snapshotCommit(); - } else { - LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", - persistenceId(), getId()); - } + context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + } else if (clientActor != null) { // Send message for replication currentBehavior.handleMessage(getSelf(), @@ -608,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void commitSnapshot(long sequenceNumber) { - context.getReplicatedLog().snapshotCommit(); - - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(sequenceNumber); + context.getSnapshotManager().commit(persistence(), sequenceNumber); } /** @@ -703,17 +698,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void onLeaderChanged(String oldLeader, String newLeader){}; - private void trimPersistentData(long sequenceNumber) { - // Trim akka snapshots - // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied - // For now guessing that it is ANDed. - persistence().deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); - - // Trim akka journal - persistence().deleteMessages(sequenceNumber); - } - private String getLeaderAddress(){ if(isLeader()){ return getSelf().path().toString(); @@ -734,67 +718,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void handleCaptureSnapshotReply(byte[] snapshotBytes) { LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - - Snapshot sn = Snapshot.create(snapshotBytes, - context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), - captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), - captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - - persistence().saveSnapshot(sn); - - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - - long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - if (context.getReplicatedLog().dataSize() > dataThreshold) { - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", - persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, - captureSnapshot.getLastAppliedIndex()); - } - - // if memory is less, clear the log based on lastApplied. - // this could/should only happen if one of the followers is down - // as normally we keep removing from the log when its replicated to all. - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); - - // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an - // install snapshot to a follower. - if(captureSnapshot.getReplicatedToAllIndex() >= 0) { - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } - } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ - // clear the log based on replicatedToAllIndex - context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), - captureSnapshot.getReplicatedToAllTerm()); - - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - } else { - // The replicatedToAllIndex was not found in the log - // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. - // In this scenario we may need to save the snapshot to the akka persistence - // snapshot for recovery but we do not need to do the replicated log trimming. - context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - } - - - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + - "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(), - replicatedLog.getSnapshotTerm()); - - if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { - // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot( - ByteString.copyFrom(snapshotBytes))); - } - - captureSnapshot = null; - context.setSnapshotCaptureInitiated(false); + context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory()); } protected long getTotalMemory() { @@ -806,9 +730,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { - private static final int DATA_SIZE_DIVIDER = 5; - private long dataSizeSinceLastSnapshot = 0; + private long dataSizeSinceLastSnapshot = 0L; + public ReplicatedLogImpl(Snapshot snapshot) { super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), @@ -874,9 +798,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex() + 1; - if(!hasFollowers()) { + if (!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log // due to this the journalSize will never become anything close to the // snapshot batch count. In fact will mostly be 1. @@ -890,51 +813,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // as if we were maintaining a real snapshot dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; } - + long journalSize = replicatedLogEntry.getIndex() + 1; long dataThreshold = getTotalMemory() * - getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - // when a snaphsot is being taken, captureSnapshot != null - if (!context.isSnapshotCaptureInitiated() && - ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || - dataSizeForCheck > dataThreshold)) { - - dataSizeSinceLastSnapshot = 0; + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," + - " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold); + if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 + || dataSizeForCheck > dataThreshold)) { - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; + boolean started = context.getSnapshotManager().capture(replicatedLogEntry, + currentBehavior.getReplicatedToAllIndex()); - ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); - if (!hasFollowers()) { - lastAppliedIndex = replicatedLogEntry.getIndex(); - lastAppliedTerm = replicatedLogEntry.getTerm(); - } else if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); + if(started){ + dataSizeSinceLastSnapshot = 0; } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); - LOG.debug("{}: Snapshot Capture lastApplied:{} ", - persistenceId(), context.getLastApplied()); - LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), - lastAppliedIndex); - LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), - lastAppliedTerm); - } - - // send a CaptureSnapshot to self to make the expensive operation async. - long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), - null); - context.setSnapshotCaptureInitiated(true); } + if (callback != null){ callback.apply(replicatedLogEntry); } @@ -1038,7 +932,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void saveSnapshot(Object o) { // Make saving Snapshot successful - commitSnapshot(-1L); + // Committing the snapshot here would end up calling commit in the creating state which would + // be a state violation. That's why now we send a message to commit the snapshot. + self().tell(COMMIT_SNAPSHOT, self()); + } + } + + + private class CreateSnapshotProcedure implements Procedure { + + @Override + public void apply(Void aVoid) throws Exception { + createSnapshot(); } } @@ -1051,4 +956,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return currentBehavior; } + private static class BehaviorStateHolder { + private RaftActorBehavior behavior; + private String leaderId; + + void init(RaftActorBehavior behavior) { + this.behavior = behavior; + this.leaderId = behavior != null ? behavior.getLeaderId() : null; + } + + RaftActorBehavior getBehavior() { + return behavior; + } + + String getLeaderId() { + return leaderId; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 9d391a1588..2e7eb5eb3a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -166,8 +166,6 @@ public interface RaftActorContext { */ ConfigParams getConfigParams(); - void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated); - - boolean isSnapshotCaptureInitiated(); + SnapshotManager getSnapshotManager(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 6fc5e4369b..eb059d60fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext { private boolean snapshotCaptureInitiated; + // Snapshot manager will need to be created on demand as it needs raft actor context which cannot + // be passed to it in the constructor + private SnapshotManager snapshotManager; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, @@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } - @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; - } - @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } @@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext { peerAddresses.put(peerId, peerAddress); } + + public SnapshotManager getSnapshotManager() { + if(snapshotManager == null){ + snapshotManager = new SnapshotManager(this, LOG); + } + return snapshotManager; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java new file mode 100644 index 0000000000..432d678491 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -0,0 +1,431 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +public class SnapshotManager implements SnapshotState { + + + private final SnapshotState IDLE = new Idle(); + private final SnapshotState CAPTURING = new Capturing(); + private final SnapshotState PERSISTING = new Persisting(); + private final SnapshotState CREATING = new Creating(); + + private final Logger LOG; + private final RaftActorContext context; + private final LastAppliedTermInformationReader lastAppliedTermInformationReader = + new LastAppliedTermInformationReader(); + private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader = + new ReplicatedToAllTermInformationReader(); + + + private SnapshotState currentState = IDLE; + private CaptureSnapshot captureSnapshot; + + public SnapshotManager(RaftActorContext context, Logger logger) { + this.context = context; + this.LOG = logger; + } + + @Override + public boolean isCapturing() { + return currentState.isCapturing(); + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower); + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return currentState.capture(lastLogEntry, replicatedToAllIndex); + } + + @Override + public void create(Procedure callback) { + currentState.create(callback); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + currentState.commit(persistenceProvider, sequenceNumber); + } + + @Override + public void rollback() { + currentState.rollback(); + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return currentState.trimLog(desiredTrimIndex, currentBehavior); + } + + private boolean hasFollowers(){ + return context.getPeerAddresses().keySet().size() > 0; + } + + private String persistenceId(){ + return context.getId(); + } + + private class AbstractSnapshotState implements SnapshotState { + + @Override + public boolean isCapturing() { + return false; + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + LOG.debug("capture should not be called in state {}", this); + return false; + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + LOG.debug("captureToInstall should not be called in state {}", this); + return false; + } + + @Override + public void create(Procedure callback) { + LOG.debug("create should not be called in state {}", this); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + LOG.debug("persist should not be called in state {}", this); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + LOG.debug("commit should not be called in state {}", this); + } + + @Override + public void rollback() { + LOG.debug("rollback should not be called in state {}", this); + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + LOG.debug("trimLog should not be called in state {}", this); + return -1; + } + + protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){ + // we would want to keep the lastApplied as its used while capturing snapshots + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + + if(LOG.isTraceEnabled()) { + LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}", + persistenceId(), desiredTrimIndex, lastApplied, tempMin); + } + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, + context.getTermInformation().getCurrentTerm()); + + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotCommit(); + return tempMin; + } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) { + // It's possible a follower was lagging and an install snapshot advanced its match index past + // the current replicatedToAllIndex. Since the follower is now caught up we should advance the + // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely + // due to a previous snapshot triggered by the memory threshold exceeded, in that case we + // trim the log to the last applied index even if previous entries weren't replicated to all followers. + currentBehavior.setReplicatedToAllIndex(tempMin); + } + return -1; + } + } + + private class Idle extends AbstractSnapshotState { + + private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + TermInformationReader lastAppliedTermInfoReader = + lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), + lastLogEntry, hasFollowers()); + + long lastAppliedIndex = lastAppliedTermInfoReader.getIndex(); + long lastAppliedTerm = lastAppliedTermInfoReader.getTerm(); + + TermInformationReader replicatedToAllTermInfoReader = + replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex); + + long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex(); + long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); + + // send a CaptureSnapshot to self to make the expensive operation async. + captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), + lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, + newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null); + + SnapshotManager.this.currentState = CAPTURING; + + if(targetFollower != null){ + LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); + } else { + LOG.info("{}: Initiating snapshot capture {} to install on {}", + persistenceId(), captureSnapshot, targetFollower); + } + + context.getActor().tell(captureSnapshot, context.getActor()); + + return true; + } + + @Override + public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null); + } + + @Override + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + return capture(lastLogEntry, replicatedToAllIndex, targetFollower); + } + + @Override + public String toString() { + return "Idle"; + } + + @Override + public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + return doTrimLog(desiredTrimIndex, currentBehavior); + } + } + + private class Capturing extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void create(Procedure callback) { + try { + callback.apply(null); + SnapshotManager.this.currentState = CREATING; + } catch (Exception e) { + LOG.error("Unexpected error occurred", e); + } + } + + @Override + public String toString() { + return "Capturing"; + } + + } + + private class Creating extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior, long totalMemory) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(snapshotBytes, + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + persistenceProvider.saveSnapshot(sn); + + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); + + long dataThreshold = totalMemory * + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", + persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } + + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an + // install snapshot to a follower. + if(captureSnapshot.getReplicatedToAllIndex() >= 0) { + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } + + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else { + // The replicatedToAllIndex was not found in the log + // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. + // In this scenario we may need to save the snapshot to the akka persistence + // snapshot for recovery but we do not need to do the replicated log trimming. + context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); + } + + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + if (context.getId().equals(currentBehavior.getLeaderId()) + && captureSnapshot.isInstallSnapshotInitiated()) { + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot( + ByteString.copyFrom(snapshotBytes))); + } + + captureSnapshot = null; + SnapshotManager.this.currentState = PERSISTING; + } + + @Override + public String toString() { + return "Creating"; + } + + } + + private class Persisting extends AbstractSnapshotState { + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); + + persistenceProvider.deleteMessages(sequenceNumber); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public void rollback() { + context.getReplicatedLog().snapshotRollback(); + + LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public String toString() { + return "Persisting"; + } + + } + + private static interface TermInformationReader { + long getIndex(); + long getTerm(); + } + + private static class LastAppliedTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, + ReplicatedLogEntry lastLogEntry, boolean hasFollowers){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + if (!hasFollowers) { + if(lastLogEntry != null) { + index = lastLogEntry.getIndex(); + term = lastLogEntry.getTerm(); + } + } else if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } else if(log.getSnapshotIndex() > -1){ + index = log.getSnapshotIndex(); + term = log.getSnapshotTerm(); + } + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } + + private static class ReplicatedToAllTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + + if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } + + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java new file mode 100644 index 0000000000..9a9bf1c774 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + +public interface SnapshotState { + /** + * Should return true when a snapshot is being captured + * @return + */ + boolean isCapturing(); + + /** + * Initiate capture snapshot + * + * @param lastLogEntry the last entry in the replicated log + * @param replicatedToAllIndex the current replicatedToAllIndex + * + * @return true if capture was started + */ + boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + + /** + * Initiate capture snapshot for the purposing of installing that snapshot + * + * @param lastLogEntry + * @param replicatedToAllIndex + * @param targetFollower + * + * @return true if capture was started + */ + boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); + + /** + * Create the snapshot + * + * @param callback a procedure to be called which should create the snapshot + */ + void create(Procedure callback); + + /** + * Persist the snapshot + * + * @param persistenceProvider + * @param snapshotBytes + * @param currentBehavior + * @param totalMemory + */ + void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior + ,long totalMemory); + + /** + * Commit the snapshot by trimming the log + * + * @param persistenceProvider + * @param sequenceNumber + */ + void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber); + + /** + * Rollback the snapshot + */ + void rollback(); + + /** + * Trim the log + * + * @param desiredTrimIndex + * @return the actual trim index + */ + long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a63c62fa30..2c433f9007 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } @@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (wasLastChunk && !context.isSnapshotCaptureInitiated()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { @@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - - CaptureSnapshot captureSnapshot = new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId, - captureSnapshot); - } - - actor().tell(captureSnapshot, actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index a1bcf8541c..c276d32cce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -39,6 +39,8 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); + /** * Information about the RaftActor whose behavior this class represents */ @@ -254,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // message is sent to itself electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), + context.getActor(), ELECTION_TIMEOUT, context.getActorSystem().dispatcher(), context.getActor()); } @@ -460,31 +462,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param snapshotCapturedIndex */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { - // we would want to keep the lastApplied as its used while capturing snapshots - long lastApplied = context.getLastApplied(); - long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); - - if(LOG.isTraceEnabled()) { - LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}", - logName, snapshotCapturedIndex, lastApplied, tempMin); - } + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); - if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin, - context.getTermInformation().getCurrentTerm()); - - //use the term of the temp-min, since we check for isPresent, entry will not be null - ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); - context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); - context.getReplicatedLog().snapshotCommit(); - setReplicatedToAllIndex(tempMin); - } else if(tempMin > getReplicatedToAllIndex()) { - // It's possible a follower was lagging and an install snapshot advanced its match index past - // the current replicatedToAllIndex. Since the follower is now caught up we should advance the - // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely - // due to a previous snapshot triggered by the memory threshold exceeded, in that case we - // trim the log to the last applied index even if previous entries weren't replicated to all followers. - setReplicatedToAllIndex(tempMin); + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index b36c41abf2..74bede171f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -58,7 +58,14 @@ public class Candidate extends AbstractRaftActorBehavior { votesRequired = getMajorityVoteCount(peers.size()); startNewTerm(); - scheduleElection(electionDuration()); + + if(context.getPeerAddresses().isEmpty()){ + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } + + } @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 0f251a3012..a6722e6ff9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -46,9 +46,14 @@ public class Follower extends AbstractRaftActorBehavior { public Follower(RaftActorContext context) { super(context, RaftState.Follower); - scheduleElection(electionDuration()); - initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); + + if(context.getPeerAddresses().isEmpty()){ + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } + } private boolean isLogEntryPresent(long index){ @@ -96,6 +101,19 @@ public class Follower extends AbstractRaftActorBehavior { // to make it easier to read. Before refactoring ensure tests // cover the code properly + if (snapshotTracker != null) { + // if snapshot install is in progress, follower should just acknowledge append entries with a reply. + AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex(), lastTerm()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); + } + sender.tell(reply, actor()); + + return this; + } + // 1. Reply false if term < currentTerm (§5.1) // This is handled in the appendEntries method of the base class @@ -242,7 +260,7 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(reply, actor()); - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 120a3a16a9..13445b0b26 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -261,4 +261,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex()); assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData()); } + + protected String testActorPath(String id){ + return "akka://test/user" + id; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 1cc7b5f576..53cca23741 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext { private Map peerAddresses = new HashMap<>(); private ConfigParams configParams; private boolean snapshotCaptureInitiated; + private SnapshotManager snapshotManager; public MockRaftActorContext(){ electionTerm = new ElectionTerm() { @@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext { } @Override - public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { - this.snapshotCaptureInitiated = snapshotCaptureInitiated; - } - - @Override - public boolean isSnapshotCaptureInitiated() { - return snapshotCaptureInitiated; + public SnapshotManager getSnapshotManager() { + if(this.snapshotManager == null){ + this.snapshotManager = new SnapshotManager(this, getLogger()); + } + return this.snapshotManager; } public void setConfigParams(ConfigParams configParams) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index b192b7cd24..0a4a2c7717 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -54,16 +55,17 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -157,6 +159,16 @@ public class RaftActorTest extends AbstractActorTest { } } + + public void waitUntilLeader(){ + for(int i = 0;i < 10; i++){ + if(isLeader()){ + break; + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + public List getState() { return state; } @@ -176,6 +188,13 @@ public class RaftActorTest extends AbstractActorTest { return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier, + DataPersistenceProvider dataPersistenceProvider){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); @@ -673,11 +692,13 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0); - verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class)); } }; } @@ -701,9 +722,11 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); + mockRaftActor.waitUntilLeader(); + mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class)); + verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); } @@ -736,10 +759,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1)); - RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, -1, + new MockRaftActorContext.MockPayload("D")), -1); + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -763,17 +788,18 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); + ImmutableMap.of("leader", "fake/path"), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); mockRaftActor.waitForInitializeBehaviorComplete(); + MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class))); mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class))); - mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class))); + mockRaftActor.getReplicatedLog().append(lastEntry); ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), @@ -785,7 +811,8 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); long replicatedToAllIndex = 1; - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1)); + + mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); verify(mockRaftActor.delegate).createSnapshot(); @@ -927,7 +954,9 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); - mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1)); + raftActorContext.getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, + new MockRaftActorContext.MockPayload("D")), 1); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -942,7 +971,83 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testRaftRoleChangeNotifier() throws Exception { + public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); + MessageCollectorActor.waitUntilReady(notifierActor); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + long heartBeatInterval = 100; + config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS)); + config.setElectionTimeoutFactor(20); + + String persistenceId = factory.generateActorId("notifier-"); + + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, + Collections.emptyMap(), Optional.of(config), notifierActor, + new NonPersistentProvider()), persistenceId); + + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); + + + // check if the notifier got a role change from null to Follower + RoleChanged raftRoleChanged = matches.get(0); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertNull(raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Follower to Candidate + raftRoleChanged = matches.get(1); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = matches.get(2); + assertEquals(persistenceId, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + }}; + } + + @Test + public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception { new JavaTestKit(getSystem()) {{ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); MessageCollectorActor.waitUntilReady(notifierActor); @@ -954,8 +1059,8 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, - Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); + factory.createActor(MockRaftActor.props(persistenceId, + ImmutableMap.of("leader", "fake/path"), Optional.of(config), notifierActor), persistenceId); List matches = null; for(int i = 0; i < 5000 / heartBeatInterval; i++) { @@ -967,7 +1072,7 @@ public class RaftActorTest extends AbstractActorTest { Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); } - assertEquals(3, matches.size()); + assertEquals(2, matches.size()); // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); @@ -981,11 +1086,6 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); - // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = matches.get(2); - assertEquals(persistenceId, raftRoleChanged.getMemberId()); - assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); - assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); }}; } @@ -1031,9 +1131,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); - leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1)); + leaderActor.getRaftActorContext().getSnapshotManager() + .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("x")), 4); - leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(leaderActor.delegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1059,8 +1160,14 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-2"), new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); - leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider() + , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); + + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -1123,9 +1230,10 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(6, followerActor.getReplicatedLog().size()); //snapshot on 4 - followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1)); + followerActor.getRaftActorContext().getSnapshotManager().capture( + new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("D")), 4); - followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); verify(followerActor.delegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); @@ -1160,7 +1268,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + + // The commit is needed to complete the snapshot creation process + followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log @@ -1258,7 +1369,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -1342,7 +1453,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -1385,7 +1496,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }}; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index 7a291f364b..bd670fd581 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -62,9 +62,11 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt // Create the leader and 2 follower actors and verify initial syncing of the followers after leader // persistence recovery. - follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams()); + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), newFollowerConfigParams()); - follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), newFollowerConfigParams()); peerAddresses = ImmutableMap.builder(). put(follower1Id, follower1Actor.path().toString()). diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index aca19c0b8b..d4a9f7701b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -56,10 +56,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId)); // Create the leader and 2 follower actors. + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), newFollowerConfigParams()); - follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams()); - - follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), newFollowerConfigParams()); Map peerAddresses = ImmutableMap.builder(). put(follower1Id, follower1Actor.path().toString()). diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java new file mode 100644 index 0000000000..3d75edb5bd --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -0,0 +1,551 @@ +package org.opendaylight.controller.cluster.raft; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import akka.testkit.TestActorRef; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.LoggerFactory; + +public class SnapshotManagerTest extends AbstractActorTest { + + @Mock + private RaftActorContext mockRaftActorContext; + + @Mock + private ConfigParams mockConfigParams; + + @Mock + private ReplicatedLog mockReplicatedLog; + + @Mock + private DataPersistenceProvider mockDataPersistenceProvider; + + @Mock + private RaftActorBehavior mockRaftActorBehavior; + + @Mock + private Procedure mockProcedure; + + private SnapshotManager snapshotManager; + + private TestActorFactory factory; + + private TestActorRef actorRef; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses(); + doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams(); + doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); + doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); + doReturn("123").when(mockRaftActorContext).getId(); + doReturn("123").when(mockRaftActorBehavior).getLeaderId(); + + snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); + factory = new TestActorFactory(getSystem()); + + actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); + doReturn(actorRef).when(mockRaftActorContext).getActor(); + + } + + @After + public void tearDown(){ + factory.close(); + } + + @Test + public void testConstruction(){ + assertEquals(false, snapshotManager.isCapturing()); + } + + @Test + public void testCaptureToInstall(){ + + // Force capturing toInstall = true + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload()), 0, "follower-1"); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(0L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(0L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + actorRef.underlyingActor().clear(); + } + + @Test + public void testCapture(){ + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(9L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(9L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + + actorRef.underlyingActor().clear(); + + } + + @Test + public void testIllegalCapture() throws Exception { + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + List allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + + // This will not cause snapshot capture to start again + capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertFalse(capture); + + allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + } + + @Test + public void testPersistWhenReplicatedToAllIndexMinusOne(){ + doReturn("123").when(mockRaftActorContext).getId(); + doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); + verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); + + Snapshot snapshot = snapshotArgumentCaptor.getValue(); + + assertEquals(6, snapshot.getLastAppliedTerm()); + assertEquals(9, snapshot.getLastAppliedIndex()); + assertEquals(9, snapshot.getLastIndex()); + assertEquals(6, snapshot.getLastTerm()); + assertEquals(10, snapshot.getState().length); + assertTrue(Arrays.equals(bytes, snapshot.getState())); + assertEquals(0, snapshot.getUnAppliedEntries().size()); + + verify(mockReplicatedLog).snapshotPreCommit(45L, 6L); + } + + + @Test + public void testCreate() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure).apply(null); + } + + @Test + public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + } + + @Test + public void testCallingCreateBeforeCapture() throws Exception { + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(0)).apply(null); + } + + @Test + public void testCallingCreateAfterPersist() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + reset(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, never()).apply(null); + } + + @Test + public void testPersistWhenReplicatedToAllIndexNotMinus(){ + doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9); + doReturn(6L).when(replicatedLogEntry).getTerm(); + doReturn(9L).when(replicatedLogEntry).getIndex(); + + // when replicatedToAllIndex != -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), 9); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(9); + } + + + @Test + public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + } + + @Test + public void testPersistSendInstallSnapshot(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + assertTrue(capture); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + ArgumentCaptor sendInstallSnapshotArgumentCaptor + = ArgumentCaptor.forClass(SendInstallSnapshot.class); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture()); + + SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); + + assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray())); + } + + @Test + public void testCallingPersistWithoutCaptureWillDoNothing(){ + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + @Test + public void testCallingPersistTwiceWillDoNoHarm(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + + @Test + public void testCommit(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog).snapshotCommit(); + + verify(mockDataPersistenceProvider).deleteMessages(100L); + + ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); + + verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture()); + + assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100 + // config snapShotBatchCount = 10 + // therefore maxSequenceNumber = 90 + } + + @Test + public void testCommitBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(100L); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCommitBeforeCapture(){ + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong()); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCallingCommitMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, times(1)).snapshotCommit(); + + verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L); + + verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + } + + @Test + public void testRollback(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.rollback(); + + verify(mockReplicatedLog).snapshotRollback(); + } + + + @Test + public void testRollbackBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testRollbackBeforeCapture(){ + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testCallingRollbackMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1, "follower-1"); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior + , Runtime.getRuntime().totalMemory()); + + snapshotManager.rollback(); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, times(1)).snapshotRollback(); + } + + @Test + public void testTrimLog(){ + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10, mockRaftActorBehavior); + + verify(mockReplicatedLog).snapshotPreCommit(10, 5); + verify(mockReplicatedLog).snapshotCommit(); + } + + @Test + public void testTrimLogAfterCapture(){ + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10, mockRaftActorBehavior); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + + @Test + public void testTrimLogAfterCaptureToInstall(){ + boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9, "follower-1"); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10, mockRaftActorBehavior); + + verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 60f45523cf..63fd530675 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -1,12 +1,15 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -184,6 +187,20 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { assertEquals("getTerm", 1001, reply.getTerm()); } + @Test + public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + candidate = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 29fb613327..26e4364878 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -2,16 +2,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -577,12 +584,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -627,6 +629,57 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(((Follower) follower).getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(((Follower) follower).getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = mock(AppendEntries.class); + doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + // We should not hit the code that needs to look at prevLogIndex because we are short circuiting + verify(appendEntries, never()).getPrevLogIndex(); + + } + @Test public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { logStart("testInitialSyncUpWithHandleInstallSnapshot"); @@ -635,12 +688,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -692,12 +740,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, getNextChunk(bsSnapshot, 10, 50), 3, 3); @@ -714,6 +757,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; @@ -746,6 +804,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockPayload(data)); } + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 383ebefd36..ba0bd0f29c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -48,6 +48,7 @@ import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -524,6 +525,8 @@ public class LeaderTest extends AbstractLeaderTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -946,7 +949,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + return createActorContext(LEADER_ID, actorRef); } private MockRaftActorContext createActorContextWithFollower() { @@ -1025,14 +1028,15 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -1267,6 +1271,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setConfigParams(configParams); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java new file mode 100644 index 0000000000..ec35b03b0a --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * A message initiated internally from the RaftActor when some state of a leader has changed + * + * @author Thomas Pantelis + */ +public class LeaderStateChanged implements Serializable { + private static final long serialVersionUID = 1L; + + private final String memberId; + private final String leaderId; + + public LeaderStateChanged(String memberId, String leaderId) { + this.memberId = memberId; + this.leaderId = leaderId; + } + + public String getMemberId() { + return memberId; + } + + public String getLeaderId() { + return leaderId; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId) + .append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java index d065f6d211..598dfb1fe8 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -17,16 +17,17 @@ import java.util.Map; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; /** - * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying + * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying * the listeners (within the same node), which are registered with it. *

* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor. */ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable { - private String memberId; - private Map registeredListeners = Maps.newHashMap(); + private final String memberId; + private final Map registeredListeners = Maps.newHashMap(); private RoleChangeNotification latestRoleChangeNotification = null; + private LeaderStateChanged latestLeaderStateChanged; public RoleChangeNotifier(String memberId) { this.memberId = memberId; @@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos getSender().tell(new RegisterRoleChangeListenerReply(), getSelf()); + if(latestLeaderStateChanged != null) { + getSender().tell(latestLeaderStateChanged, getSelf()); + } + if (latestRoleChangeNotification != null) { getSender().tell(latestRoleChangeNotification, getSelf()); } @@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos for (ActorRef listener: registeredListeners.values()) { listener.tell(latestRoleChangeNotification, getSelf()); } + } else if (message instanceof LeaderStateChanged) { + latestLeaderStateChanged = (LeaderStateChanged)message; + + for (ActorRef listener: registeredListeners.values()) { + listener.tell(latestLeaderStateChanged, getSelf()); + } } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java index f315bfdf7a..770d709abe 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java @@ -28,4 +28,13 @@ public class RoleChanged { public String getNewRole() { return newRole; } + + @Override + public String toString() { + return "RoleChanged{" + + "memberId='" + memberId + '\'' + + ", oldRole='" + oldRole + '\'' + + ", newRole='" + newRole + '\'' + + '}'; + } } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 7df398355e..cfbf9450aa 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -35,8 +35,8 @@ operational.persistent=false # failing an operation (eg transaction create and change listener registration). #shard-initialization-timeout-in-seconds=300 -# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. -#shard-journal-recovery-log-batch-size=5000 +# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store. +#shard-journal-recovery-log-batch-size=1000 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. #shard-snapshot-batch-count=20000 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index 886c473067..538f2981da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; @@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { final long startTime = System.nanoTime(); + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Boolean result) { @@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { new TransactionCommitFailedException( "Can Commit failed, no detailed cause available.")); } else { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the preCommit phase doPreCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor); } } } @@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture canCommitFuture = cohort.canCommit(); - Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the commit phase doCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor); } } @@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture preCommitFuture = cohort.preCommit(); - Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we're done. commitStatsTracker.addDuration(System.nanoTime() - startTime); clientSubmitFuture.set(); + } else { + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor); } } @@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture commitFuture = cohort.commit(); - Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); } private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 681132e660..8ac424a6a8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -116,7 +116,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration DataChangeScope scope) { Future future = actorContext.executeOperationAsync(shard, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + new RegisterChangeListener(path, dataChangeListenerActor, scope), actorContext.getDatastoreContext().getShardInitializationTimeout()); future.onComplete(new OnComplete(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 99bc9de6a2..8e00a1389c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -140,16 +139,14 @@ public class Shard extends RaftActor { * Coordinates persistence recovery on startup. */ private ShardRecoveryCoordinator recoveryCoordinator; - private List currentLogRecoveryBatch; private final DOMTransactionFactory transactionFactory; private final String txnDispatcherPath; - protected Shard(final ShardIdentifier name, final Map peerAddresses, + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), mapPeerAddresses(peerAddresses), - Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); this.name = name.toString(); this.datastoreContext = datastoreContext; @@ -191,6 +188,8 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + + recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); } private void setTransactionCommitTimeout() { @@ -198,20 +197,8 @@ public class Shard extends RaftActor { datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); } - private static Map mapPeerAddresses( - final Map peerAddresses) { - Map map = new HashMap<>(); - - for (Map.Entry entry : peerAddresses - .entrySet()) { - map.put(entry.getKey().toString(), entry.getValue()); - } - - return map; - } - public static Props props(final ShardIdentifier name, - final Map peerAddresses, + final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); @@ -690,7 +677,7 @@ public class Shard extends RaftActor { LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", persistenceId(), listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf()); } private ListenerRegistration txList = recoveryCoordinator.getTransactions(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size()); - } - - for(DOMStoreWriteTransaction tx: txList) { - try { - syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - } - recoveryCoordinator = null; - currentLogRecoveryBatch = null; //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); @@ -950,11 +883,11 @@ public class Shard extends RaftActor { private static final long serialVersionUID = 1L; final ShardIdentifier name; - final Map peerAddresses; + final Map peerAddresses; final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(final ShardIdentifier name, final Map peerAddresses, + ShardCreator(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 136c6813ea..52762b4eb3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; @@ -20,39 +21,44 @@ import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; @@ -74,7 +80,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -90,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; + private final String shardManagerIdentifierString; + private final ClusterWrapper cluster; private final Configuration configuration; @@ -116,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; @@ -152,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { - findPrimary(FindPrimary.fromSerializable(message)); + if (message instanceof FindPrimary) { + findPrimary((FindPrimary)message); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -172,15 +181,54 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRoleChangeNotification((RoleChangeNotification) message); } else if(message instanceof FollowerInitialSyncUpStatus){ onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); - } else{ + } else if(message instanceof ShardNotInitializedTimeout) { + onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); + } else if(message instanceof LeaderStateChanged) { + onLeaderStateChanged((LeaderStateChanged)message); + } else { unknownMessage(message); } } + private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); + + ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + + } else { + LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); + } + } + + private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + ShardInformation shardInfo = message.getShardInfo(); + + LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), + shardInfo.getShardName()); + + shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); + + if(!shardInfo.isShardInitialized()) { + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); + } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + } + } + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { - LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), - status.isInitialSyncDone()); + LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), + status.getName(), status.isInitialSyncDone()); ShardInformation shardInformation = findShardInformation(status.getName()); @@ -193,16 +241,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onRoleChangeNotification(RoleChangeNotification roleChanged) { - LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - if (isReady()) { - LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, - waitTillReadyCountdownLatch.getCount()); + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); waitTillReadyCountdownLatch.countDown(); } @@ -222,10 +270,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return null; } - private boolean isReady() { + private boolean isReadyWithLeaderId() { boolean isReady = true; for (ShardInformation info : localShards.values()) { - if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + if(!info.isShardReadyWithLeaderId()){ isReady = false; break; } @@ -256,14 +304,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardId.getShardName() == null) { return; } + markShardAsInitialized(shardId.getShardName()); } private void markShardAsInitialized(String shardName) { - LOG.debug("Initializing shard [{}]", shardName); + LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName); + ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { shardInformation.setActorInitialized(); + + shardInformation.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -300,7 +352,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -308,20 +360,42 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, - final Supplier messageSupplier) { - if (!shardInformation.isShardInitialized()) { - if(waitUntilInitialized) { + private void sendResponse(ShardInformation shardInformation, boolean doWait, + boolean wantShardReady, final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) { + if(doWait) { final ActorRef sender = getSender(); final ActorRef self = self(); - shardInformation.addRunnableOnInitialized(new Runnable() { + + Runnable replyRunnable = new Runnable() { @Override public void run() { sender.tell(messageSupplier.get(), self); } - }); + }; + + OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : + new OnShardInitialized(replyRunnable); + + shardInformation.addOnShardInitialized(onShardInitialized); + + LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( + datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), + getContext().dispatcher(), getSelf()); + + onShardInitialized.setTimeoutSchedule(timeoutSchedule); + + } else if (!shardInformation.isShardInitialized()) { + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); } else { - getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } return; @@ -330,19 +404,38 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } + private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + return new NoShardLeaderException(String.format( + "Could not find a leader for shard %s. This typically happens when the system is coming up or " + + "recovering and a leader is being elected. Try again later.", shardId)); + } + + private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + return new NotInitializedException(String.format( + "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.remove(message.member().roles().head()); } private void memberUp(ClusterEvent.MemberUp message) { String memberName = message.member().roles().head(); + LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.put(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName), - getShardActorPath(shardName, memberName)); + info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), + getShardActorPath(shardName, memberName), getSelf()); } } @@ -384,13 +477,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { - info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString())); + info.setActor(newShardActor(schemaContext, info)); } else { info.getActor().tell(message, getSelf()); } - info.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -402,54 +492,71 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } + @VisibleForTesting + protected ClusterWrapper getCluster() { + return cluster; + } + + @VisibleForTesting + protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { + return getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + } + private void findPrimary(FindPrimary message) { - String shardName = message.getShardName(); + LOG.debug("{}: In findPrimary: {}", persistenceId(), message); + + final String shardName = message.getShardName(); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } + + return found; } }); return; } - List members = configuration.getMembersFromShardName(shardName); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!cluster.getCurrentMemberName().equals(entry.getKey())) { + String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - if(cluster.getCurrentMemberName() != null) { - members.remove(cluster.getCurrentMemberName()); - } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, path); - /** - * FIXME: Instead of sending remote shard actor path back to sender, - * forward FindPrimary message to remote shard manager - */ - // There is no way for us to figure out the primary (for now) so assume - // that one of the remote nodes is a primary - for(String memberName : members) { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - getShardActorPath(shardName, memberName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + getContext().actorSelection(path).forward(message, getContext()); return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + + LOG.debug("{}: No shard found for {}", persistenceId(), shardName); + + getSender().tell(new PrimaryNotFoundException( + String.format("No primary shard found for %s.", shardName)), getSelf()); + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); + return builder; } private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()) - .append("/user/") - .append(ShardManagerIdentifier.builder().type(type).build().toString()) - .append("/") + StringBuilder builder = getShardManagerActorPathBuilder(address); + builder.append("/") .append(getShardIdentifier(memberName, shardName)); return builder.toString(); } @@ -481,7 +588,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } @@ -496,22 +603,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); - List members = - this.configuration.getMembersFromShardName(shardName); + List members = this.configuration.getMembersFromShardName(shardName); String currentMemberName = this.cluster.getCurrentMemberName(); for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - ShardIdentifier shardId = getShardIdentifier(memberName, - shardName); - String path = - getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId, path); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + String path = getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardId.toString(), path); } } return peerAddresses; @@ -552,23 +656,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private class ShardInformation { + @VisibleForTesting + protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; // flag that determines if the actor is ready for business private boolean actorInitialized = false; private boolean followerSyncStatus = false; - private final List runnablesOnInitialized = Lists.newArrayList(); + private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; + private String leaderId; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; @@ -595,11 +701,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } - Map getPeerAddresses() { + Map getPeerAddresses() { return peerAddresses; } - void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ @@ -611,42 +717,87 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { peerId, peerAddress, actor.path()); } - actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); } } + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + } + boolean isShardInitialized() { return getActor() != null && actorInitialized; } + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return peerAddresses.get(leaderId); + } + } + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + this.actorInitialized = true; - for(Runnable runnable: runnablesOnInitialized) { - runnable.run(); + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; } - runnablesOnInitialized.clear(); + boolean ready = isShardReadyWithLeaderId(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + } + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if(!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } + } } - void addRunnableOnInitialized(Runnable runnable) { - runnablesOnInitialized.add(runnable); + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); } - public void setRole(String newRole) { - this.role = newRole; + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); } - public String getRole(){ - return this.role; + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); } - public void setFollowerSyncStatus(boolean syncStatus){ + void setFollowerSyncStatus(boolean syncStatus){ this.followerSyncStatus = syncStatus; } - public boolean isInSync(){ + boolean isInSync(){ if(RaftState.Follower.name().equals(this.role)){ return followerSyncStatus; } else if(RaftState.Leader.name().equals(this.role)){ @@ -656,6 +807,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + void setLeaderId(String leaderId) { + this.leaderId = leaderId; + + notifyOnShardInitializedCallbacks(); + } } private static class ShardManagerCreator implements Creator { @@ -680,6 +836,57 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private static class OnShardInitialized { + private final Runnable replyRunnable; + private Cancellable timeoutSchedule; + + OnShardInitialized(Runnable replyRunnable) { + this.replyRunnable = replyRunnable; + } + + Runnable getReplyRunnable() { + return replyRunnable; + } + + Cancellable getTimeoutSchedule() { + return timeoutSchedule; + } + + void setTimeoutSchedule(Cancellable timeoutSchedule) { + this.timeoutSchedule = timeoutSchedule; + } + } + + private static class OnShardReady extends OnShardInitialized { + OnShardReady(Runnable replyRunnable) { + super(replyRunnable); + } + } + + private static class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } + static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 50528575e7..7e547d7257 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -8,19 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.Collection; -import java.util.Collections; +import java.io.IOException; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; /** @@ -34,115 +34,86 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator { - private static final int TIME_OUT = 10; - - private final List resultingTxList = Lists.newArrayList(); - private final SchemaContext schemaContext; + private final InMemoryDOMDataStore store; + private List currentLogRecoveryBatch; private final String shardName; - private final ExecutorService executor; private final Logger log; - private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log, - String name) { - this.schemaContext = schemaContext; + ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) { + this.store = store; this.shardName = shardName; this.log = log; - this.name = name; - - executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ShardRecovery-" + shardName + "-%d").build()); } - /** - * Submits a batch of journal log entries. - * - * @param logEntries the serialized journal log entries - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(List logEntries, DOMStoreWriteTransaction resultingTx) { - LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); - } + void startLogRecoveryBatch(int maxBatchSize) { + currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - /** - * Submits a snapshot. - * - * @param snapshotBytes the serialized snapshot - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); + log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); } - Collection getTransactions() { - // Shutdown the executor and wait for task completion. - executor.shutdown(); - + void appendRecoveredLogPayload(Payload payload) { try { - if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { - return resultingTxList; + if(payload instanceof ModificationPayload) { + currentLogRecoveryBatch.add((ModificationPayload) payload); + } else if (payload instanceof CompositeModificationPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()))); + } else if (payload instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()))); } else { - log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT); + log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (IOException e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } - return Collections.emptyList(); } - private static abstract class ShardRecoveryTask implements Runnable { - - final DOMStoreWriteTransaction resultingTx; - - ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) { - this.resultingTx = resultingTx; + private void commitTransaction(DOMStoreWriteTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + try { + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } catch (Exception e) { + log.error("{}: Failed to commit Tx on recovery", shardName, e); } } - private class LogRecoveryTask extends ShardRecoveryTask { - - private final List logEntries; - - LogRecoveryTask(List logEntries, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.logEntries = logEntries; - } - - @Override - public void run() { - for(int i = 0; i < logEntries.size(); i++) { - MutableCompositeModification.fromSerializable( - logEntries.get(i)).apply(resultingTx); - // Null out to GC quicker. - logEntries.set(i, null); + /** + * Applies the current batched log entries to the data store. + */ + void applyCurrentLogRecoveryBatch() { + log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); + + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); + for(ModificationPayload payload: currentLogRecoveryBatch) { + try { + MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx); + } catch (Exception e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } } - } - private class SnapshotRecoveryTask extends ShardRecoveryTask { + commitTransaction(writeTx); + + currentLogRecoveryBatch = null; + } - private final byte[] snapshotBytes; + /** + * Applies a recovered snapshot to the data store. + * + * @param snapshotBytes the serialized snapshot + */ + void applyRecoveredSnapshot(final byte[] snapshotBytes) { + log.debug("{}: Applyng recovered sbapshot", shardName); - SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.snapshotBytes = snapshotBytes; - } + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); - @Override - public void run() { - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); + writeTx.write(YangInstanceIdentifier.builder().build(), node); - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } + commitTransaction(writeTx); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c479da7312..aeb4062103 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(LOG.isDebugEnabled()) { LOG.debug("Tx {} finishCanCommit", transactionId); } - // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform - // their canCommit processing. If any one fails then we'll fail canCommit. - Future> combinedFuture = - invokeCohorts(new CanCommitTransaction(transactionId).toSerializable()); + // For empty transactions return immediately + if(cohorts.size() == 0){ + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true); + } + returnFuture.set(Boolean.TRUE); + return; + } - combinedFuture.onComplete(new OnComplete>() { + final Object message = new CanCommitTransaction(transactionId).toSerializable(); + + final Iterator iterator = cohorts.iterator(); + + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Iterable responses) throws Throwable { - if(failure != null) { - if(LOG.isDebugEnabled()) { + public void onComplete(Throwable failure, Object response) throws Throwable { + if (failure != null) { + if (LOG.isDebugEnabled()) { LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); } returnFuture.setException(failure); @@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } boolean result = true; - for(Object response: responses) { - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - result = false; - break; - } - } else { - LOG.error("Unexpected response type {}", response.getClass()); - returnFuture.setException(new IllegalArgumentException( - String.format("Unexpected response type %s", response.getClass()))); - return; + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + result = false; } + } else { + LOG.error("Unexpected response type {}", response.getClass()); + returnFuture.setException(new IllegalArgumentException( + String.format("Unexpected response type %s", response.getClass()))); + return; } - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + + if(iterator.hasNext() && result){ + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(this, actorContext.getClientDispatcher()); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + } + returnFuture.set(Boolean.valueOf(result)); } - returnFuture.set(Boolean.valueOf(result)); + } - }, actorContext.getClientDispatcher()); + }; + + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 7f2f328135..64f914b19f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -485,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void onComplete(Throwable failure, ActorSelection primaryShard) { if(failure != null) { - newTxFutureCallback.onComplete(failure, null); + newTxFutureCallback.createTransactionContext(failure, null); } else { newTxFutureCallback.setPrimaryShard(primaryShard); } @@ -566,7 +566,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(transactionType == TransactionType.WRITE_ONLY && actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier); + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", + identifier, primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. @@ -612,7 +613,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard); + } Object serializedCreateMessage = new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), @@ -645,6 +648,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + createTransactionContext(failure, response); + } + + private void createTransactionContext(Throwable failure, Object response) { // Mainly checking for state violation here to perform a volatile read of "initialized" to // ensure updates to operationLimter et al are visible to this thread (ie we're doing // "piggy-back" synchronization here). diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java deleted file mode 100644 index 576010f916..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.messages; - -import java.io.Serializable; - -public class ActorNotInitialized implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index a34330bcf6..2c18eaa86f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -9,39 +9,39 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.common.base.Preconditions; +import java.io.Serializable; /** * The FindPrimary message is used to locate the primary of any given shard * */ -public class FindPrimary implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = FindPrimary.class; +public class FindPrimary implements Serializable { + private static final long serialVersionUID = 1L; private final String shardName; - private final boolean waitUntilInitialized; + private final boolean waitUntilReady; - public FindPrimary(String shardName, boolean waitUntilInitialized){ + public FindPrimary(String shardName, boolean waitUntilReady){ Preconditions.checkNotNull(shardName, "shardName should not be null"); this.shardName = shardName; - this.waitUntilInitialized = waitUntilInitialized; + this.waitUntilReady = waitUntilReady; } public String getShardName() { return shardName; } - public boolean isWaitUntilInitialized() { - return waitUntilInitialized; + public boolean isWaitUntilReady() { + return waitUntilReady; } @Override - public Object toSerializable() { - return this; - } - - public static FindPrimary fromSerializable(Object message){ - return (FindPrimary) message; + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady) + .append("]"); + return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java index 346519ed5a..82f3649939 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java @@ -8,18 +8,17 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; public class PeerAddressResolved { - private final ShardIdentifier peerId; + private final String peerId; private final String peerAddress; - public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) { + public PeerAddressResolved(String peerId, String peerAddress) { this.peerId = peerId; this.peerAddress = peerAddress; } - public ShardIdentifier getPeerId() { + public String getPeerId() { return peerId; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java index a5565020ed..4c154d43ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java @@ -8,56 +8,48 @@ package org.opendaylight.controller.cluster.datastore.messages; +import java.io.Serializable; -public class PrimaryFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryFound.class; - private final String primaryPath; +public class PrimaryFound implements Serializable { + private static final long serialVersionUID = 1L; - public PrimaryFound(final String primaryPath) { - this.primaryPath = primaryPath; - } + private final String primaryPath; - public String getPrimaryPath() { - return primaryPath; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; + public PrimaryFound(final String primaryPath) { + this.primaryPath = primaryPath; } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryFound that = (PrimaryFound) o; - if (!primaryPath.equals(that.primaryPath)) { - return false; + public String getPrimaryPath() { + return primaryPath; } - return true; - } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - @Override - public int hashCode() { - return primaryPath.hashCode(); - } + PrimaryFound that = (PrimaryFound) o; - @Override - public String toString() { - return "PrimaryFound{" + - "primaryPath='" + primaryPath + '\'' + - '}'; - } + if (!primaryPath.equals(that.primaryPath)) { + return false; + } + return true; + } - @Override - public Object toSerializable() { - return this; - } + @Override + public int hashCode() { + return primaryPath.hashCode(); + } - public static PrimaryFound fromSerializable(final Object message){ - return (PrimaryFound) message; - } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java deleted file mode 100644 index b47c91b6e5..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - -import com.google.common.base.Preconditions; - -public class PrimaryNotFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryNotFound.class; - - private final String shardName; - - public PrimaryNotFound(final String shardName){ - - Preconditions.checkNotNull(shardName, "shardName should not be null"); - - this.shardName = shardName; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryNotFound that = (PrimaryNotFound) o; - - if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return shardName != null ? shardName.hashCode() : 0; - } - - @Override - public Object toSerializable() { - return this; - } - - public static PrimaryNotFound fromSerializable(final Object message){ - return (PrimaryNotFound) message; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java index dea085153b..1d8edece1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -9,7 +9,9 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.serialization.Serialization; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; @@ -20,15 +22,15 @@ public class RegisterChangeListener implements SerializableMessage { ListenerRegistrationMessages.RegisterChangeListener.class; private final YangInstanceIdentifier path; - private final ActorPath dataChangeListenerPath; + private final ActorRef dataChangeListener; private final AsyncDataBroker.DataChangeScope scope; public RegisterChangeListener(YangInstanceIdentifier path, - ActorPath dataChangeListenerPath, + ActorRef dataChangeListener, AsyncDataBroker.DataChangeScope scope) { this.path = path; - this.dataChangeListenerPath = dataChangeListenerPath; + this.dataChangeListener = dataChangeListener; this.scope = scope; } @@ -42,7 +44,7 @@ public class RegisterChangeListener implements SerializableMessage { } public ActorPath getDataChangeListenerPath() { - return dataChangeListenerPath; + return dataChangeListener.path(); } @@ -50,14 +52,14 @@ public class RegisterChangeListener implements SerializableMessage { public ListenerRegistrationMessages.RegisterChangeListener toSerializable() { return ListenerRegistrationMessages.RegisterChangeListener.newBuilder() .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path)) - .setDataChangeListenerActorPath(dataChangeListenerPath.toString()) + .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener)) .setDataChangeScope(scope.ordinal()).build(); } - public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){ + public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){ ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable; return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()), - actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(), + actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()), AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java index bbfbbaa80b..a2f04851eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java @@ -9,32 +9,34 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.serialization.Serialization; import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; public class RegisterChangeListenerReply implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.RegisterChangeListenerReply.class; - private final ActorPath listenerRegistrationPath; + private final ActorRef listenerRegistration; - public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) { - this.listenerRegistrationPath = listenerRegistrationPath; + public RegisterChangeListenerReply(final ActorRef listenerRegistration) { + this.listenerRegistration = listenerRegistration; } public ActorPath getListenerRegistrationPath() { - return listenerRegistrationPath; + return listenerRegistration.path(); } @Override public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() { return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder() - .setListenerRegistrationPath(listenerRegistrationPath.toString()).build(); + .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build(); } public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){ ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable; return new RegisterChangeListenerReply( - actorSystem.actorFor(o.getListenerRegistrationPath()).path() + actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath()) ); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 0fb09d8231..b6250fc1cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -17,6 +17,7 @@ import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Futures; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; @@ -35,17 +36,16 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -98,8 +98,9 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; + private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private final Cache> primaryShardActorSelectionCache; + private Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -121,14 +122,6 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() - .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) - .build(); - - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -150,6 +143,12 @@ public class ActorContext { transactionCommitOperationTimeout = new Timeout(Duration.create( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + + shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); } public DatastoreContext getDatastoreContext() { @@ -202,46 +201,30 @@ public class ActorContext { return schemaContext; } - /** - * Finds the primary shard for the given shard name - * - * @param shardName - * @return - */ - public Optional findPrimaryShard(String shardName) { - String path = findPrimaryPathOrNull(shardName); - if (path == null){ - return Optional.absent(); - } - return Optional.of(actorSystem.actorSelection(path)); - } - public Future findPrimaryShardAsync(final String shardName) { Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), - datastoreContext.getShardInitializationTimeout()); + new FindPrimary(shardName, true), shardInitializationTimeout); return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { - PrimaryFound found = PrimaryFound.fromSerializable(response); + if(response instanceof PrimaryFound) { + PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); return actorSelection; - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found primary shard %s but it's not initialized yet. " + - "Please try again later", shardName)); - } else if(response instanceof PrimaryNotFound) { - throw new PrimaryNotFoundException( - String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; + } else if(response instanceof PrimaryNotFoundException) { + throw (PrimaryNotFoundException)response; + } else if(response instanceof NoShardLeaderException) { + throw (NoShardLeaderException)response; } throw new UnknownMessageException(String.format( @@ -277,7 +260,7 @@ public class ActorContext { */ public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); + new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override @@ -286,10 +269,8 @@ public class ActorContext { LocalShardFound found = (LocalShardFound)response; LOG.debug("Local shard found {}", found.getPath()); return found.getPath(); - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found local shard for %s but it's not initialized yet.", - shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; } else if(response instanceof LocalShardNotFound) { throw new LocalShardNotFoundException( String.format("Local shard for %s does not exist.", shardName)); @@ -301,26 +282,6 @@ public class ActorContext { }, getClientDispatcher()); } - private String findPrimaryPathOrNull(String shardName) { - Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable()); - - if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound found = PrimaryFound.fromSerializable(result); - - LOG.debug("Primary found {}", found.getPrimaryPath()); - return found.getPrimaryPath(); - - } else if (result.getClass().equals(ActorNotInitialized.class)){ - throw new NotInitializedException( - String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) - ); - - } else { - return null; - } - } - - /** * Executes an operation on a local actor and wait for it's response * @@ -428,16 +389,21 @@ public class ActorContext { * * @param message */ - public void broadcast(Object message){ - for(String shardName : configuration.getAllShardNames()){ - - Optional primary = findPrimaryShard(shardName); - if (primary.isPresent()) { - primary.get().tell(message, ActorRef.noSender()); - } else { - LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", - message.getClass().getSimpleName(), shardName); - } + public void broadcast(final Object message){ + for(final String shardName : configuration.getAllShardNames()){ + + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + LOG.warn("broadcast failed to send message {} to shard {}: {}", + message.getClass().getSimpleName(), shardName, failure); + } else { + primaryShard.tell(message, ActorRef.noSender()); + } + } + }, getClientDispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index b775cf0a99..dc83af9a75 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -124,7 +124,7 @@ module distributed-datastore-provider { } leaf shard-journal-recovery-log-batch-size { - default 5000; + default 1000; type non-zero-uint32-type; description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store."; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 8cafb46528..34f0164504 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; @@ -87,7 +88,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } protected Props newShardProps() { - return Shard.props(shardID, Collections.emptyMap(), + return Shard.props(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -102,7 +103,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected void onRecoveryComplete() { @@ -117,7 +118,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ }; TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), "testRecovery"); + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery"); assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java similarity index 99% rename from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java rename to opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index c760349b1e..0b166f5ac8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -47,7 +47,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh * * @author Thomas Pantelis */ -public class DOMConcurrentDataCommitCoordinatorTest { +public class ConcurrentDOMDataBrokerTest { private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class); private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index f6c8f07f6b..57e0e26c11 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -7,10 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -28,7 +28,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -96,7 +96,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { Assert.assertEquals("getPath", path, registerMsg.getPath()); Assert.assertEquals("getScope", scope, registerMsg.getScope()); - reply(new RegisterChangeListenerReply(getRef().path())); + reply(new RegisterChangeListenerReply(getRef())); for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new ActorNotInitialized()); + reply(new NotInitializedException("not initialized")); new Within(duration("1 seconds")) { @Override @@ -242,7 +242,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { @Override public Future answer(InvocationOnMock invocation) { proxy.close(); - return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path())); + return Futures.successful((Object)new RegisterChangeListenerReply(getRef())); } }; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index a3c5eb4b00..a8384d8758 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -147,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test - public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionWritesWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -163,8 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard. - final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -240,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionWritesWithShardNotInitiallyReady(true); + } + + @Test + public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { + testTransactionWritesWithShardNotInitiallyReady(false); + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -455,16 +465,16 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test(expected=NoShardLeaderException.class) - public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; - String shardName = "test-1"; + String shardName = "default"; // We don't want the shard to become the leader so prevent shard election from completing // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Set the leader election timeout low for the test. @@ -474,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. @@ -486,8 +497,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Override public void run() { try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); txCohort.set(writeTx.ready()); } catch(Exception e) { @@ -523,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test(expected=NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true); + } + + @Test(expected=NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { + testTransactionCommitFailureWithNoShardLeader(false); + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java index 4e61260550..1ab03b216c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -1,21 +1,22 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.RaftState; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; public class RoleChangeNotifierTest extends AbstractActorTest { @@ -51,8 +52,6 @@ public class RoleChangeNotifierTest extends AbstractActorTest { TestActorRef notifierTestActorRef = TestActorRef.create( getSystem(), RoleChangeNotifier.getProps(memberId), memberId); - RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor(); - notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); // no notification should be sent as listener has not yet registered @@ -74,6 +73,32 @@ public class RoleChangeNotifierTest extends AbstractActorTest { }}; } + + @Test + public void testHandleLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) {{ + String actorId = "testHandleLeaderStateChanged"; + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(actorId), actorId); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender()); + + // listener registers after the sate has been changed, ensure we sent the latest state change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef()); + + expectMsgClass(RegisterRoleChangeListenerReply.class); + + LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender()); + + leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 99417076bf..b676cf225c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -9,16 +9,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -34,19 +41,23 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -56,6 +67,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; @@ -66,7 +78,15 @@ public class ShardManagerTest extends AbstractActorTest { @Mock private static CountDownLatch ready; - private static ActorRef mockShardActor; + private static TestActorRef mockShardActor; + + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } @Before public void setUp() { @@ -75,9 +95,11 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); } + + mockShardActor.underlyingActor().clear(); } @After @@ -86,165 +108,346 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - DatastoreContext.Builder builder = DatastoreContext.newBuilder(); - builder.dataStoreType(shardMrgIDSuffix); return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - builder.build(), ready); + datastoreContextBuilder.build(), ready); + } + + private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { + Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor); + } + }; + + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - expectMsgEquals(duration("5 seconds"), - new PrimaryNotFound("non-existent").toSerializable()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; } @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception { + public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; } @Test - public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception { + public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - // We're passing waitUntilInitialized = true to FindPrimary so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @Test - public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindLocalShard("non-existent", false), getRef()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - assertEquals("getShardName", "non-existent", notFound.getShardName()); + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + // We're passing waitUntilInitialized = true to FindPrimary so the response should be + // delayed until we send ActorInitialized and RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); - assertTrue("Found path contains " + found.getPath().path().toString(), - found.getPath().path().toString().contains("member-1-shard-default-config")); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; } @Test - public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NotInitializedException.class); + + shardManager.tell(new ActorInitialized(), mockShardActor); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; } @Test - public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { + public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.Candidate.name()), mockShardActor); - // We're passing waitUntilInitialized = true to FindLocalShard so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test - public void testOnReceiveMemberUp() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); - PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), - PrimaryFound.SERIALIZABLE_CLASS)); + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); String path = found.getPrimaryPath(); - assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + + shardManager2.underlyingActor().verifyFindPrimary(); + + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForMemberRemoved(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); } @Test - public void testOnReceiveMemberDown() throws Exception { + public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindLocalShard("non-existent", false), getRef()); + + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + assertEquals("getShardName", "non-existent", notFound.getShardName()); + }}; + } + + @Test + public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); - MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); + }}; + } - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + @Test + public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + + @Test + public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + // We're passing waitUntilInitialized = true to FindLocalShard so the response should be + // delayed until we send ActorInitialized. + Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), + new Timeout(5, TimeUnit.SECONDS)); + + shardManager.tell(new ActorInitialized(), mockShardActor); + + Object resp = Await.result(future, duration("5 seconds")); + assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); }}; } @@ -436,35 +639,52 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationReleaseReady() throws Exception { + public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + + verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); verify(ready, times(1)).countDown(); }}; } + @Test + public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + + verify(ready, times(1)).countDown(); + + }}; + } + + @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, never()).countDown(); @@ -652,4 +872,69 @@ public class ShardManagerTest extends AbstractActorTest { return delegate.create(); } } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); + } + } + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; + } + + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); + } + + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); + } + + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0fbe68665e..cc96d0d3b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -70,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -100,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -121,7 +122,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, - dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef()); + dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterChangeListenerReply.class); @@ -159,8 +160,12 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + // Use a non persistent provider because this test actually invokes persist on the journal + // this will cause all other messages to not be queued properly after that. + // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when + // it does do a persist) + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -209,7 +214,7 @@ public class ShardTest extends AbstractShardTest { onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor.path(), + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), @@ -287,7 +292,7 @@ public class ShardTest extends AbstractShardTest { final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { TestShard() { - super(shardID, Collections.singletonMap(shardID, null), + super(shardID, Collections.singletonMap(shardID.toString(), null), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -318,7 +323,7 @@ public class ShardTest extends AbstractShardTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); String address = "akka://foobar"; - shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address)); + shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address)); assertEquals("getPeerAddresses", address, ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); @@ -763,7 +768,7 @@ public class ShardTest extends AbstractShardTest { Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected boolean isLeader() { @@ -934,7 +939,7 @@ public class ShardTest extends AbstractShardTest { // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); @@ -1422,31 +1427,44 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); + + new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - DelegatingPersistentDataProvider delegating; + class TestShard extends Shard { - @Override - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } + protected TestShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + super(name, peerAddresses, datastoreContext, schemaContext); + } - return delegating; - } + DelegatingPersistentDataProvider delegating; - @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); - } - }; + protected DataPersistenceProvider persistence() { + if(delegating == null) { + delegating = new DelegatingPersistentDataProvider(super.persistence()); + } + return delegating; + } + + @Override + protected void commitSnapshot(final long sequenceNumber) { + super.commitSnapshot(sequenceNumber); + latch.get().countDown(); + } + + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + } + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new TestShard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); } }; @@ -1459,8 +1477,9 @@ public class ShardTest extends AbstractShardTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); - shard.tell(capture, getRef()); + // Trigger creation of a snapshot by ensuring + RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1472,7 +1491,7 @@ public class ShardTest extends AbstractShardTest { latch.set(new CountDownLatch(1)); savedSnapshot.set(null); - shard.tell(capture, getRef()); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1528,13 +1547,13 @@ public class ShardTest extends AbstractShardTest { final DatastoreContext persistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); - final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), persistentContext, SCHEMA_CONTEXT); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); - final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), nonPersistentContext, SCHEMA_CONTEXT); new ShardTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 09a4532b53..c3fef611e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -60,7 +60,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } private ActorRef createShard(){ - return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 8ebb145728..e63ace3e2c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -79,7 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef createShard(){ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); + Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 647b6e7b54..d595adc8bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), isA(requestType), any(Timeout.class)); + + doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) + .when(actorContext).getTransactionCommitOperationTimeout(); } private void verifyCohortInvocations(int nCohorts, Class requestType) { @@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ListenableFuture future = proxy.canCommit(); - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); + Boolean actual = future.get(5, TimeUnit.SECONDS); - verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + assertEquals("canCommit", false, actual); + + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); } @Test(expected = TestException.class) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index ac2c079641..265ec59f1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -659,11 +661,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, TestException.class); } - @Test - public void testReadyWithInitialCreateTransactionFailure() throws Exception { - - doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( - mockActorContext).findPrimaryShardAsync(anyString()); + private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { + doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -681,7 +680,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, PrimaryNotFoundException.class); + verifyCohortFutures(proxy, toThrow.getClass()); + } + + @Test + public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock")); + } + + @Test + public void testWriteOnlyTxWithNotInitializedException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock")); + } + + @Test + public void testWriteOnlyTxWithNoShardLeaderException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock")); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java index 2a29d2c089..e206e69cda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -59,7 +59,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); @@ -133,7 +133,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java new file mode 100644 index 0000000000..696a898169 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java @@ -0,0 +1,58 @@ +package org.opendaylight.controller.cluster.datastore.messages; + +import static junit.framework.TestCase.assertEquals; +import akka.actor.Actor; +import akka.serialization.Serialization; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; + +public class RegisterChangeListenerReplyTest extends AbstractActorTest { + + private TestActorFactory factory; + + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } + + @After + public void shutDown(){ + factory.close(); + } + + @Test + public void testToSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + + RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor); + + ListenerRegistrationMessages.RegisterChangeListenerReply serialized + = registerChangeListenerReply.toSerializable(); + + assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath()); + } + + @Test + public void testFromSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + + RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor); + + ListenerRegistrationMessages.RegisterChangeListenerReply serialized + = registerChangeListenerReply.toSerializable(); + + + RegisterChangeListenerReply fromSerialized + = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized); + + assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString()); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java new file mode 100644 index 0000000000..2354a7946a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java @@ -0,0 +1,67 @@ +package org.opendaylight.controller.cluster.datastore.messages; + +import static junit.framework.TestCase.assertEquals; +import akka.actor.Actor; +import akka.serialization.Serialization; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; + +public class RegisterChangeListenerTest extends AbstractActorTest { + + private TestActorFactory factory; + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } + + @After + public void shutDown(){ + factory.close(); + } + + @Test + public void testToSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor + , AsyncDataBroker.DataChangeScope.BASE); + + ListenerRegistrationMessages.RegisterChangeListener serialized + = registerChangeListener.toSerializable(); + + NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath(); + + assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0)); + assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath()); + assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope()); + + } + + @Test + public void testFromSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor + , AsyncDataBroker.DataChangeScope.SUBTREE); + + ListenerRegistrationMessages.RegisterChangeListener serialized + = registerChangeListener.toSerializable(); + + + RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized); + + assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath()); + assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString()); + assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope()); + + + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6bd732e038..6b4f633778 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; @@ -30,14 +34,16 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -45,10 +51,16 @@ import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); + + private static class TestMessage { + } + private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -57,6 +69,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -64,15 +88,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -287,18 +324,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + transactionCreationInitialRateLimit(155L).build(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), dataStoreContext); // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); actorContext.setTxCreationLimit(1.0); @@ -320,16 +354,9 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -337,18 +364,11 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsNotGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -388,15 +408,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); @@ -431,18 +448,15 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFound("foobar")); + return Futures.successful((Object) new PrimaryNotFoundException("not found")); } }; @@ -459,7 +473,6 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); - } @Test @@ -468,18 +481,15 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new ActorNotInitialized()); + return Futures.successful((Object) new NotInitializedException("not iniislized")); } }; @@ -496,7 +506,49 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + + actorContext.broadcast(new TestMessage()); + + expectFirstMatching(shardActorRef1, TestMessage.class); + expectFirstMatching(shardActorRef2, TestMessage.class); + }}; } + private T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java index 4bd0ad818f..d62c9dbc28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -10,13 +10,14 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.UntypedActor; - import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.junit.Assert; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; *

*/ public class MessageCollectorActor extends UntypedActor { - private List messages = new ArrayList<>(); + private final List messages = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ @@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor { } } + public void clear() { + messages.clear(); + } + public static List getAllMessages(ActorRef actor) throws Exception { FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); @@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor { return output; } + public static T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 81b6bccaf0..63878df23c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import com.google.common.base.Optional; public class MockActorContext extends ActorContext { @@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext { return executeRemoteOperationResponse; } - @Override public Optional findPrimaryShard(String shardName) { - return Optional.absent(); - } - public void setExecuteShardOperationResponse(Object response){ executeShardOperationResponse = response; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index fe40aa0fd4..810b270cfc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString; import akka.cluster.ClusterEvent; import akka.cluster.MemberStatus; import akka.cluster.UniqueAddress; -import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import scala.collection.JavaConversions; import java.util.HashSet; import java.util.Set; +import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import scala.collection.JavaConversions; public class MockClusterWrapper implements ClusterWrapper{ private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550); + private String currentMemberName = "member-1"; + + public MockClusterWrapper() { + } + + public MockClusterWrapper(String currentMemberName) { + this.currentMemberName = currentMemberName; + } @Override public void subscribeToMemberEvents(ActorRef actorRef) { @@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{ @Override public String getCurrentMemberName() { - return "member-1"; + return currentMemberName; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 4ef7d65857..0bc561f1bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore.utils; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; public class MockConfiguration implements Configuration{ - @Override public List getMemberShardNames(final String memberName) { - return Arrays.asList("default"); + private Map> shardMembers = ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build(); + + public MockConfiguration() { + } + + public MockConfiguration(Map> shardMembers) { + this.shardMembers = shardMembers; } - @Override public Optional getModuleNameFromNameSpace( + @Override + public List getMemberShardNames(final String memberName) { + return new ArrayList<>(shardMembers.keySet()); + } + @Override + public Optional getModuleNameFromNameSpace( final String nameSpace) { return Optional.absent(); } @@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{ return Arrays.asList("member-2", "member-3"); } - return Collections.emptyList(); + List members = shardMembers.get(shardName); + return members != null ? members : Collections.emptyList(); } @Override public Set getAllShardNames() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 9761ed8615..4240608036 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -21,6 +21,10 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", + "junk"); + + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); @@ -31,6 +35,7 @@ public class TestModel { private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). node(OUTER_LIST_QNAME).build(); public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index badec6f831..03634627d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -34,3 +34,105 @@ bounded-mailbox { mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } + +Member1 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "DEBUG" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2558 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-1" + ] + } + } +} + +Member2 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2559 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-2" + ] + } + } +} diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index cdce946aba..88dd0e55c5 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -414,7 +414,9 @@ public final class NetconfDevice implements RemoteDevice errors = ((RpcErrorsException)failure).getRpcErrors(); diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java index bc84734190..50676c57c1 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java @@ -243,8 +243,6 @@ public class EditConfig extends AbstractConfigNetconfOperation { } Date revision = module.getRevision(); - Preconditions.checkState(!revisionsByNamespace.containsKey(revision), - "Duplicate revision %s for namespace %s", revision, namespace); IdentityMapping identityMapping = revisionsByNamespace.get(revision); if(identityMapping == null) { diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java index 0d3370548a..283ec424ba 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java @@ -8,11 +8,16 @@ package org.opendaylight.controller.netconf.confignetconfconnector.osgi; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry; import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator; @@ -23,6 +28,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleIdentifierImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,12 +105,31 @@ final class YangStoreSnapshot implements YangStoreContext { @Override public Set getModules() { - return schemaContext.getModules(); + final Set modules = Sets.newHashSet(schemaContext.getModules()); + for (final Module module : schemaContext.getModules()) { + modules.addAll(module.getSubmodules()); + } + return modules; } @Override public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) { - return schemaContext.getModuleSource(moduleIdentifier).get(); + final Optional moduleSource = schemaContext.getModuleSource(moduleIdentifier); + if(moduleSource.isPresent()) { + return moduleSource.get(); + } else { + try { + return Iterables.find(getModules(), new Predicate() { + @Override + public boolean apply(final Module input) { + final ModuleIdentifierImpl id = new ModuleIdentifierImpl(input.getName(), Optional.fromNullable(input.getNamespace()), Optional.fromNullable(input.getRevision())); + return id.equals(moduleIdentifier); + } + }).getSource(); + } catch (final NoSuchElementException e) { + throw new IllegalArgumentException("Source for yang module " + moduleIdentifier + " not found", e); + } + } } @Override diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java index 89ce149e12..499ae01ed6 100644 --- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java +++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.netconf.mdsal.connector; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.util.HashSet; import java.util.Set; @@ -50,25 +51,39 @@ public class MdsalNetconfOperationServiceFactory implements NetconfOperationServ return transformCapabilities(currentSchemaContext.getCurrentContext()); } - static Set transformCapabilities(final SchemaContext currentContext1) { + static Set transformCapabilities(final SchemaContext currentContext) { final Set capabilities = new HashSet<>(); // [RFC6241] 8.3. Candidate Configuration Capability capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0")); - final SchemaContext currentContext = currentContext1; final Set modules = currentContext.getModules(); for (final Module module : modules) { - if(currentContext.getModuleSource(module).isPresent()) { - capabilities.add(new YangModuleCapability(module, currentContext.getModuleSource(module).get())); - } else { - LOG.warn("Missing source for module {}. This module will not be available from netconf server", - module); + Optional cap = moduleToCapability(module); + if(cap.isPresent()) { + capabilities.add(cap.get()); + } + for (final Module submodule : module.getSubmodules()) { + cap = moduleToCapability(submodule); + if(cap.isPresent()) { + capabilities.add(cap.get()); + } } } return capabilities; } + private static Optional moduleToCapability(final Module module) { + final String source = module.getSource(); + if(source !=null) { + return Optional.of(new YangModuleCapability(module, source)); + } else { + LOG.warn("Missing source for module {}. This module will not be available from netconf server", + module); + } + return Optional.absent(); + } + @Override public AutoCloseable registerCapabilityListener(final CapabilityListener listener) { return currentSchemaContext.registerCapabilityListener(listener); diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java index 06c695c25a..f4017fbe58 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -45,6 +45,9 @@ public class NetconfClientSessionNegotiator extends private static final XPathExpression sessionIdXPath = XMLNetconfUtil .compileXPath("/netconf:hello/netconf:session-id"); + private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil + .compileXPath("/hello/session-id"); + private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0"; protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences, @@ -113,16 +116,22 @@ public class NetconfClientSessionNegotiator extends } private long extractSessionId(final Document doc) { - final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE); - Preconditions.checkState(sessionIdNode != null, ""); - String textContent = sessionIdNode.getTextContent(); - if (textContent == null || textContent.equals("")) { - throw new IllegalStateException("Session id not received from server"); + String textContent = getSessionIdWithXPath(doc, sessionIdXPath); + if (Strings.isNullOrEmpty(textContent)) { + textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace); + if (Strings.isNullOrEmpty(textContent)) { + throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc)); + } } return Long.valueOf(textContent); } + private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) { + final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE); + return sessionIdNode != null ? sessionIdNode.getTextContent() : null; + } + @Override protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel, final NetconfHelloMessage message) throws NetconfDocumentedException { diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java index ab73126021..d7d8660ae4 100644 --- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java +++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java @@ -61,7 +61,7 @@ public class SSHTest { @AfterClass public static void tearDown() throws Exception { hashedWheelTimer.stop(); - nettyGroup.shutdownGracefully().await(); + nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS); minaTimerEx.shutdownNow(); nioExec.shutdownNow(); } diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java index 5cd17a2331..404885db7e 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java @@ -94,9 +94,9 @@ public final class NetconfHelloMessage extends NetconfMessage { private static boolean isHelloMessage(final Document document) { XmlElement element = XmlElement.fromDomElement(document.getDocumentElement()); try { + // accept even if hello has no namespace return element.getName().equals(HELLO_TAG) && - element.hasNamespace() && - element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0); + (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0)); } catch (MissingNameSpaceException e) { // Cannot happen, since we check for hasNamespace throw new IllegalStateException(e); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java index 61b23202c3..3c6b6ccab9 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.netconf.util.messages; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.collect.Collections2; import java.util.Collection; import java.util.List; @@ -59,9 +60,13 @@ public final class NetconfMessageUtil { public static Collection extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException { XmlElement responseElement = XmlElement.fromDomDocument(doc); - XmlElement capabilitiesElement = responseElement - .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES); - List caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY); + // Extract child element from with or without(fallback) the same namespace + Optional capabilitiesElement = responseElement + .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES) + .or(responseElement + .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES)); + + List caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY); return Collections2.transform(caps, new Function() { @Override