Bug 3020: Add version to AppendEntries and AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-dummy-distributed-datastore / src / main / java / org / opendaylight / controller / dummy / datastore / DummyShard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.dummy.datastore;
10
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
20 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 public class DummyShard extends UntypedActor{
28     private final Configuration configuration;
29     private final String followerId;
30     private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
31     private long lastMessageIndex  = -1;
32     private long lastMessageSize = 0;
33     private Stopwatch appendEntriesWatch;
34
35     public DummyShard(Configuration configuration, String followerId) {
36         this.configuration = configuration;
37         this.followerId = followerId;
38         LOG.info("Creating : {}", followerId);
39     }
40
41     @Override
42     public void onReceive(Object o) throws Exception {
43         if(o instanceof RequestVote){
44             RequestVote req = (RequestVote) o;
45             sender().tell(new RequestVoteReply(req.getTerm(), true), self());
46         } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
47             AppendEntries req = AppendEntries.fromSerializable(o);
48             handleAppendEntries(req);
49         } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
50             InstallSnapshot req = InstallSnapshot.fromSerializable(o);
51             handleInstallSnapshot(req);
52         } else if(o instanceof InstallSnapshot){
53             handleInstallSnapshot((InstallSnapshot) o);
54         } else {
55             LOG.error("Unknown message : {}", o.getClass());
56         }
57     }
58
59     private void handleInstallSnapshot(InstallSnapshot req) {
60         sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
61     }
62
63     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
64         LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
65                 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
66
67         if(appendEntriesWatch != null){
68             long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
69             if(elapsed >= 5){
70                 LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds" +
71                                 ", leaderCommit = {}, prevLogIndex = {}, size = {}",
72                         elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
73             }
74             appendEntriesWatch.reset().start();
75         } else {
76             appendEntriesWatch = Stopwatch.createStarted();
77         }
78
79         if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
80             LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
81         }
82
83         lastMessageIndex = req.getLeaderCommit();
84         lastMessageSize = req.getEntries().size();
85
86         long lastIndex = req.getLeaderCommit();
87         if (req.getEntries().size() > 0) {
88             for(ReplicatedLogEntry entry : req.getEntries()) {
89                 lastIndex = entry.getIndex();
90             }
91         }
92
93         if (configuration.shouldCauseTrouble() && req.getEntries().size() > 0) {
94             boolean ignore = false;
95
96             if (configuration.shouldDropReplies()) {
97                 ignore = Math.random() > 0.5;
98             }
99
100             long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
101
102             if (!ignore) {
103                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
104                 Thread.sleep(delay);
105                 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
106                         DataStoreVersions.CURRENT_VERSION), self());
107             }
108         } else {
109             sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
110                     DataStoreVersions.CURRENT_VERSION), self());
111         }
112     }
113
114     public static Props props(Configuration configuration, final String followerId) {
115
116         return Props.create(new DummyShardCreator(configuration, followerId));
117     }
118
119     private static class DummyShardCreator implements Creator<DummyShard> {
120
121         private static final long serialVersionUID = 1L;
122         private final Configuration configuration;
123         private final String followerId;
124
125         DummyShardCreator(Configuration configuration, String followerId) {
126             this.configuration = configuration;
127             this.followerId = followerId;
128         }
129
130         @Override
131         public DummyShard create() throws Exception {
132             return new DummyShard(configuration, followerId);
133         }
134     }
135
136 }