Merge "Bug 2538: Remove redundant Augmentation checks and tests"
[controller.git] / opendaylight / md-sal / sal-dummy-distributed-datastore / src / main / java / org / opendaylight / controller / dummy / datastore / DummyShard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.dummy.datastore;
10
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
18 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class DummyShard extends UntypedActor{
25     private final Configuration configuration;
26     private final String followerId;
27     private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
28     private long lastMessageIndex  = -1;
29     private long lastMessageSize = 0;
30
31     public DummyShard(Configuration configuration, String followerId) {
32         this.configuration = configuration;
33         this.followerId = followerId;
34         LOG.info("Creating : {}", followerId);
35     }
36
37     @Override
38     public void onReceive(Object o) throws Exception {
39         if(o instanceof RequestVote){
40             RequestVote req = (RequestVote) o;
41             sender().tell(new RequestVoteReply(req.getTerm(), true), self());
42         } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
43             AppendEntries req = AppendEntries.fromSerializable(o);
44             handleAppendEntries(req);
45         } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
46             InstallSnapshot req = InstallSnapshot.fromSerializable(o);
47             handleInstallSnapshot(req);
48         } else if(o instanceof InstallSnapshot){
49             handleInstallSnapshot((InstallSnapshot) o);
50         } else {
51             LOG.error("Unknown message : {}", o.getClass());
52         }
53     }
54
55     private void handleInstallSnapshot(InstallSnapshot req) {
56         sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
57     }
58
59     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
60
61         LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
62                 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
63
64         if(lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0){
65             LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId, req.getLeaderCommit(), req.getPrevLogIndex());
66         }
67
68         lastMessageIndex = req.getLeaderCommit();
69         lastMessageSize = req.getEntries().size();
70
71         long lastIndex = req.getLeaderCommit();
72         if (req.getEntries().size() > 0) {
73             for(ReplicatedLogEntry entry : req.getEntries()) {
74                 lastIndex = entry.getIndex();
75             }
76         }
77
78         if (configuration.shouldCauseTrouble() && req.getEntries().size() > 0) {
79             boolean ignore = false;
80
81             if (configuration.shouldDropReplies()) {
82                 ignore = Math.random() > 0.5;
83             }
84
85             long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
86
87             if (!ignore) {
88                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
89                 Thread.sleep(delay);
90                 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
91             }
92         } else {
93             sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
94         }
95     }
96
97     public static Props props(Configuration configuration, final String followerId) {
98
99         return Props.create(new DummyShardCreator(configuration, followerId));
100     }
101
102     private static class DummyShardCreator implements Creator<DummyShard> {
103
104         private static final long serialVersionUID = 1L;
105         private final Configuration configuration;
106         private final String followerId;
107
108         DummyShardCreator(Configuration configuration, String followerId) {
109             this.configuration = configuration;
110             this.followerId = followerId;
111         }
112
113         @Override
114         public DummyShard create() throws Exception {
115             return new DummyShard(configuration, followerId);
116         }
117     }
118
119 }