Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransactionMessageRetrySupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import java.io.Closeable;
16 import java.util.LinkedHashSet;
17 import java.util.Set;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.duration.FiniteDuration;
22
23 /**
24  * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
25  *
26  * @author Thomas Pantelis
27  */
28 @Deprecated(since = "9.0.0", forRemoval = true)
29 class ShardTransactionMessageRetrySupport implements Closeable {
30     private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
31
32     static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
33
34     private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
35     private final Shard shard;
36
37     ShardTransactionMessageRetrySupport(final Shard shard) {
38         this.shard = shard;
39     }
40
41     void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
42         LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
43
44         MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
45
46         FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
47         messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
48                 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
49
50         messagesToRetry.add(messageInfo);
51     }
52
53     void retryMessages() {
54         if (messagesToRetry.isEmpty()) {
55             return;
56         }
57
58         MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
59         messagesToRetry.clear();
60
61         for (MessageInfo info: copy) {
62             LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
63             info.retry(shard);
64         }
65     }
66
67     void onTimerMessage(final Object message) {
68         MessageInfo messageInfo = (MessageInfo)message;
69
70         LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
71
72         messagesToRetry.remove(messageInfo);
73         messageInfo.timedOut(shard);
74     }
75
76     @Override
77     public void close() {
78         for (MessageInfo info: messagesToRetry) {
79             info.timedOut(shard);
80         }
81
82         messagesToRetry.clear();
83     }
84
85     private static final class MessageInfo {
86         final Object message;
87         final ActorRef replyTo;
88         final String failureMessage;
89         Cancellable timer;
90
91         MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
92             this.message = message;
93             this.replyTo = replyTo;
94             this.failureMessage = requireNonNull(failureMessage);
95         }
96
97         void retry(final Shard shard) {
98             timer.cancel();
99             shard.getSelf().tell(message, replyTo);
100         }
101
102         void timedOut(final Shard shard) {
103             replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
104                     shard.getSelf());
105         }
106     }
107 }