2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.dummy.datastore;
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
20 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 public class DummyShard extends UntypedActor {
28 private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
30 private final Configuration configuration;
31 private final String followerId;
32 private long lastMessageIndex = -1;
33 private long lastMessageSize = 0;
34 private Stopwatch appendEntriesWatch;
36 public DummyShard(Configuration configuration, String followerId) {
37 this.configuration = configuration;
38 this.followerId = followerId;
39 LOG.info("Creating : {}", followerId);
43 public void onReceive(Object message) throws Exception {
44 if (message instanceof RequestVote) {
45 RequestVote req = (RequestVote) message;
46 sender().tell(new RequestVoteReply(req.getTerm(), true), self());
47 } else if (message instanceof AppendEntries) {
48 handleAppendEntries((AppendEntries) message);
49 } else if (message instanceof InstallSnapshot) {
50 handleInstallSnapshot((InstallSnapshot) message);
52 LOG.error("Unknown message : {}", message.getClass());
56 private void handleInstallSnapshot(InstallSnapshot req) {
57 sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
60 protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
61 LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
62 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
64 if (appendEntriesWatch != null) {
65 long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
67 LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
68 + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
69 elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
71 appendEntriesWatch.reset().start();
73 appendEntriesWatch = Stopwatch.createStarted();
76 if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
77 LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
78 req.getLeaderCommit(), req.getPrevLogIndex());
81 lastMessageIndex = req.getLeaderCommit();
82 lastMessageSize = req.getEntries().size();
84 long lastIndex = req.getLeaderCommit();
85 if (req.getEntries().size() > 0) {
86 for (ReplicatedLogEntry entry : req.getEntries()) {
87 lastIndex = entry.getIndex();
91 if (configuration.shouldCauseTrouble() && req.getEntries().size() > 0) {
92 boolean ignore = false;
94 if (configuration.shouldDropReplies()) {
95 ignore = Math.random() > 0.5;
98 long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
101 LOG.info("{} - Randomizing delay : {}", followerId, delay);
103 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
104 DataStoreVersions.CURRENT_VERSION), self());
107 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
108 DataStoreVersions.CURRENT_VERSION), self());
112 public static Props props(Configuration configuration, final String followerId) {
114 return Props.create(new DummyShardCreator(configuration, followerId));
117 private static class DummyShardCreator implements Creator<DummyShard> {
119 private static final long serialVersionUID = 1L;
120 private final Configuration configuration;
121 private final String followerId;
123 DummyShardCreator(Configuration configuration, String followerId) {
124 this.configuration = configuration;
125 this.followerId = followerId;
129 public DummyShard create() throws Exception {
130 return new DummyShard(configuration, followerId);