Bail faster on not found module
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransactionMessageRetrySupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import java.io.Closeable;
16 import java.util.LinkedHashSet;
17 import java.util.Set;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.duration.FiniteDuration;
22
23 /**
24  * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
25  *
26  * @author Thomas Pantelis
27  */
28 class ShardTransactionMessageRetrySupport implements Closeable {
29     private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
30
31     static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
32
33     private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
34     private final Shard shard;
35
36     ShardTransactionMessageRetrySupport(final Shard shard) {
37         this.shard = shard;
38     }
39
40     void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
41         LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
42
43         MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
44
45         FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
46         messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
47                 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
48
49         messagesToRetry.add(messageInfo);
50     }
51
52     void retryMessages() {
53         if (messagesToRetry.isEmpty()) {
54             return;
55         }
56
57         MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
58         messagesToRetry.clear();
59
60         for (MessageInfo info: copy) {
61             LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
62             info.retry(shard);
63         }
64     }
65
66     void onTimerMessage(final Object message) {
67         MessageInfo messageInfo = (MessageInfo)message;
68
69         LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
70
71         messagesToRetry.remove(messageInfo);
72         messageInfo.timedOut(shard);
73     }
74
75     @Override
76     public void close() {
77         for (MessageInfo info: messagesToRetry) {
78             info.timedOut(shard);
79         }
80
81         messagesToRetry.clear();
82     }
83
84     private static class MessageInfo {
85         final Object message;
86         final ActorRef replyTo;
87         final String failureMessage;
88         Cancellable timer;
89
90         MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
91             this.message = message;
92             this.replyTo = replyTo;
93             this.failureMessage = requireNonNull(failureMessage);
94         }
95
96         void retry(final Shard shard) {
97             timer.cancel();
98             shard.getSelf().tell(message, replyTo);
99         }
100
101         void timedOut(final Shard shard) {
102             replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
103                     shard.getSelf());
104         }
105     }
106 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.