X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dummy-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fdummy%2Fdatastore%2FDummyShard.java;h=62eb392f2d58d3b55d04da2c00d1e44ebf55919a;hp=0b72a32f1033884ba983c71b2651a4a88ad6c7b1;hb=0f02b7edeb1454c1a568f0f1b050757e7503ddf7;hpb=69343f4c1b9fee2e28186fb7bd4482bf7d4614cd diff --git a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java index 0b72a32f10..62eb392f2d 100644 --- a/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java +++ b/opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java @@ -9,10 +9,10 @@ package org.opendaylight.controller.dummy.datastore; import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.japi.Creator; +import akka.actor.UntypedAbstractActor; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -23,10 +23,11 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DummyShard extends UntypedActor{ +public class DummyShard extends UntypedAbstractActor { + private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class); + private final Configuration configuration; private final String followerId; - private final Logger LOG = LoggerFactory.getLogger(DummyShard.class); private long lastMessageIndex = -1; private long lastMessageSize = 0; private Stopwatch appendEntriesWatch; @@ -38,20 +39,16 @@ public class DummyShard extends UntypedActor{ } @Override - public void onReceive(Object o) throws Exception { - if(o instanceof RequestVote){ - RequestVote req = (RequestVote) o; + public void onReceive(Object message) throws Exception { + if (message instanceof RequestVote) { + RequestVote req = (RequestVote) message; sender().tell(new RequestVoteReply(req.getTerm(), true), self()); - } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) { - AppendEntries req = AppendEntries.fromSerializable(o); - handleAppendEntries(req); - } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) { - InstallSnapshot req = InstallSnapshot.fromSerializable(o); - handleInstallSnapshot(req); - } else if(o instanceof InstallSnapshot){ - handleInstallSnapshot((InstallSnapshot) o); + } else if (message instanceof AppendEntries) { + handleAppendEntries((AppendEntries) message); + } else if (message instanceof InstallSnapshot) { + handleInstallSnapshot((InstallSnapshot) message); } else { - LOG.error("Unknown message : {}", o.getClass()); + LOG.error("Unknown message : {}", message.getClass()); } } @@ -63,11 +60,11 @@ public class DummyShard extends UntypedActor{ LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}", followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size()); - if(appendEntriesWatch != null){ + if (appendEntriesWatch != null) { long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS); - if(elapsed >= 5){ - LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" + - ", leaderCommit = {}, prevLogIndex = {}, size = {}", + if (elapsed >= 5) { + LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" + + ", leaderCommit = {}, prevLogIndex = {}, size = {}", elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size()); } appendEntriesWatch.reset().start(); @@ -75,8 +72,9 @@ public class DummyShard extends UntypedActor{ appendEntriesWatch = Stopwatch.createStarted(); } - if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){ - LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex()); + if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) { + LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, + req.getLeaderCommit(), req.getPrevLogIndex()); } lastMessageIndex = req.getLeaderCommit(); @@ -84,7 +82,7 @@ public class DummyShard extends UntypedActor{ long lastIndex = req.getLeaderCommit(); if (req.getEntries().size() > 0) { - for(ReplicatedLogEntry entry : req.getEntries()) { + for (ReplicatedLogEntry entry : req.getEntries()) { lastIndex = entry.getIndex(); } } @@ -101,33 +99,16 @@ public class DummyShard extends UntypedActor{ if (!ignore) { LOG.info("{} - Randomizing delay : {}", followerId, delay); Thread.sleep(delay); - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } } else { - sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self()); + sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), self()); } } public static Props props(Configuration configuration, final String followerId) { - - return Props.create(new DummyShardCreator(configuration, followerId)); - } - - private static class DummyShardCreator implements Creator { - - private static final long serialVersionUID = 1L; - private final Configuration configuration; - private final String followerId; - - DummyShardCreator(Configuration configuration, String followerId) { - this.configuration = configuration; - this.followerId = followerId; - } - - @Override - public DummyShard create() throws Exception { - return new DummyShard(configuration, followerId); - } + return Props.create(DummyShard.class, configuration, followerId); } - }