Fix warnings/javadocs in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransactionMessageRetrySupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.io.Closeable;
14 import java.util.LinkedHashSet;
15 import java.util.Set;
16 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
20
21 /**
22  * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
23  *
24  * @author Thomas Pantelis
25  */
26 class ShardTransactionMessageRetrySupport implements Closeable {
27     private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
28
29     static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
30
31     private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
32     private final Shard shard;
33
34     ShardTransactionMessageRetrySupport(Shard shard) {
35         this.shard = shard;
36     }
37
38     void addMessageToRetry(Object message, ActorRef replyTo, String failureMessage) {
39         LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
40
41         MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
42
43         FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
44         messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
45                 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
46
47         messagesToRetry.add(messageInfo);
48     }
49
50     void retryMessages() {
51         if (messagesToRetry.isEmpty()) {
52             return;
53         }
54
55         MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
56         messagesToRetry.clear();
57
58         for (MessageInfo info: copy) {
59             LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
60             info.retry(shard);
61         }
62     }
63
64     void onTimerMessage(Object message) {
65         MessageInfo messageInfo = (MessageInfo)message;
66
67         LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
68
69         messagesToRetry.remove(messageInfo);
70         messageInfo.timedOut(shard);
71     }
72
73     @Override
74     public void close() {
75         for (MessageInfo info: messagesToRetry) {
76             info.timedOut(shard);
77         }
78
79         messagesToRetry.clear();
80     }
81
82     private static class MessageInfo {
83         final Object message;
84         final ActorRef replyTo;
85         final String failureMessage;
86         Cancellable timer;
87
88         MessageInfo(Object message, ActorRef replyTo, String failureMessage) {
89             this.message = message;
90             this.replyTo = replyTo;
91             this.failureMessage = failureMessage;
92         }
93
94         void retry(Shard shard) {
95             timer.cancel();
96             shard.getSelf().tell(message, replyTo);
97         }
98
99         void timedOut(Shard shard) {
100             replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
101                     shard.getSelf());
102         }
103     }
104 }