Fix checkstyle violations in /sal-connector-api and sal-dummy-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-dummy-distributed-datastore / src / main / java / org / opendaylight / controller / dummy / datastore / DummyShard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.dummy.datastore;
10
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
20 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 public class DummyShard extends UntypedActor {
28     private static final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
29
30     private final Configuration configuration;
31     private final String followerId;
32     private long lastMessageIndex  = -1;
33     private long lastMessageSize = 0;
34     private Stopwatch appendEntriesWatch;
35
36     public DummyShard(Configuration configuration, String followerId) {
37         this.configuration = configuration;
38         this.followerId = followerId;
39         LOG.info("Creating : {}", followerId);
40     }
41
42     @Override
43     public void onReceive(Object message) throws Exception {
44         if (message instanceof RequestVote) {
45             RequestVote req = (RequestVote) message;
46             sender().tell(new RequestVoteReply(req.getTerm(), true), self());
47         } else if (message instanceof AppendEntries) {
48             handleAppendEntries((AppendEntries) message);
49         } else if (message instanceof InstallSnapshot) {
50             handleInstallSnapshot((InstallSnapshot) message);
51         } else {
52             LOG.error("Unknown message : {}", message.getClass());
53         }
54     }
55
56     private void handleInstallSnapshot(InstallSnapshot req) {
57         sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
58     }
59
60     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
61         LOG.info("{} - Received AppendEntries message : leader term = {}, index = {}, prevLogIndex = {}, size = {}",
62                 followerId, req.getTerm(),req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
63
64         if (appendEntriesWatch != null) {
65             long elapsed = appendEntriesWatch.elapsed(TimeUnit.SECONDS);
66             if (elapsed >= 5) {
67                 LOG.error("More than 5 seconds since last append entry, elapsed Time = {} seconds"
68                                 + ", leaderCommit = {}, prevLogIndex = {}, size = {}",
69                         elapsed, req.getLeaderCommit(), req.getPrevLogIndex(), req.getEntries().size());
70             }
71             appendEntriesWatch.reset().start();
72         } else {
73             appendEntriesWatch = Stopwatch.createStarted();
74         }
75
76         if (lastMessageIndex == req.getLeaderCommit() && req.getEntries().size() > 0 && lastMessageSize > 0) {
77             LOG.error("{} - Duplicate message with leaderCommit = {} prevLogIndex = {} received", followerId,
78                     req.getLeaderCommit(), req.getPrevLogIndex());
79         }
80
81         lastMessageIndex = req.getLeaderCommit();
82         lastMessageSize = req.getEntries().size();
83
84         long lastIndex = req.getLeaderCommit();
85         if (req.getEntries().size() > 0) {
86             for (ReplicatedLogEntry entry : req.getEntries()) {
87                 lastIndex = entry.getIndex();
88             }
89         }
90
91         if (configuration.shouldCauseTrouble() && req.getEntries().size() > 0) {
92             boolean ignore = false;
93
94             if (configuration.shouldDropReplies()) {
95                 ignore = Math.random() > 0.5;
96             }
97
98             long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
99
100             if (!ignore) {
101                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
102                 Thread.sleep(delay);
103                 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
104                         DataStoreVersions.CURRENT_VERSION), self());
105             }
106         } else {
107             sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm(),
108                     DataStoreVersions.CURRENT_VERSION), self());
109         }
110     }
111
112     public static Props props(Configuration configuration, final String followerId) {
113
114         return Props.create(new DummyShardCreator(configuration, followerId));
115     }
116
117     private static class DummyShardCreator implements Creator<DummyShard> {
118
119         private static final long serialVersionUID = 1L;
120         private final Configuration configuration;
121         private final String followerId;
122
123         DummyShardCreator(Configuration configuration, String followerId) {
124             this.configuration = configuration;
125             this.followerId = followerId;
126         }
127
128         @Override
129         public DummyShard create() throws Exception {
130             return new DummyShard(configuration, followerId);
131         }
132     }
133 }