ea0749a2d9d11ee615381f462c69aee4d62de0d2
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 abstract class AbstractTransactionHandler {
29     private enum State {
30         RUNNING,
31         WAITING,
32         SUCCESSFUL,
33         FAILED,
34     }
35
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
37
38     static final int SECOND_AS_NANO = 1000000000;
39     //2^20 as in the model
40     static final int MAX_ITEM = 1048576;
41
42     static final QName ID_INTS =
43             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
44     static final QName ID =
45             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
46     static final QName ITEM =
47             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
48     static final QName NUMBER =
49             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
50
51     public static final QName ID_INT =
52             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
53     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
54     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
55
56     static final long INIT_TX_TIMEOUT_SECONDS = 125;
57
58     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
59
60     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
61     private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
62     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
63     private final long runtimeNanos;
64     private final long delayNanos;
65
66     private ScheduledFuture<?> scheduledFuture;
67     private long txCounter;
68     private State state;
69
70     AbstractTransactionHandler(final TransactionsParams params) {
71         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
72         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
73     }
74
75     final synchronized void doStart() {
76         scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
77         stopwatch.start();
78         state = State.RUNNING;
79     }
80
81     private synchronized void execute() {
82         switch (state) {
83             case FAILED:
84                 // This could happen due to scheduling artifacts
85                 break;
86             case RUNNING:
87                 runningExecute();
88                 break;
89             default:
90                 throw new IllegalStateException("Unhandled state " + state);
91         }
92     }
93
94     private void runningExecute() {
95         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
96         if (elapsed >= runtimeNanos) {
97             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
98             if (!checkSuccessful()) {
99                 state = State.WAITING;
100                 scheduledFuture.cancel(false);
101                 scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
102                 executor.shutdown();
103             }
104
105             return;
106         }
107
108         // Not completed yet: create a transaction and hook it up
109         final long txId = txCounter++;
110         final ListenableFuture<Void> execFuture = execWrite(txId);
111         LOG.debug("New future #{} allocated", txId);
112
113         // Ordering is important: we need to add the future before hooking the callback
114         futures.add(execFuture);
115         Futures.addCallback(execFuture, new FutureCallback<Void>() {
116             @Override
117             public void onSuccess(final Void result) {
118                 txSuccess(execFuture, txId);
119             }
120
121             @Override
122             public void onFailure(final Throwable cause) {
123                 txFailure(execFuture, txId, cause);
124             }
125         });
126     }
127
128     final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
129         LOG.debug("Future #{} completed successfully", txId);
130         futures.remove(execFuture);
131
132         switch (state) {
133             case FAILED:
134             case RUNNING:
135                 // No-op
136                 break;
137             case WAITING:
138                 checkSuccessful();
139                 break;
140             default:
141                 throw new IllegalStateException("Unhandled state " + state);
142         }
143     }
144
145     final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
146         LOG.debug("Future #{} failed", txId, cause);
147         futures.remove(execFuture);
148
149         switch (state) {
150             case FAILED:
151                 // no-op
152                 break;
153             case RUNNING:
154             case WAITING:
155                 state = State.FAILED;
156                 scheduledFuture.cancel(false);
157                 executor.shutdown();
158                 runFailed(cause);
159                 break;
160             default:
161                 throw new IllegalStateException("Unhandled state " + state);
162         }
163     }
164
165     private synchronized void checkComplete() {
166         final int size = futures.size();
167         if (size == 0) {
168             return;
169         }
170
171         int offset = 0;
172         for (ListenableFuture<Void> future : futures) {
173             try {
174                 future.get(0, TimeUnit.NANOSECONDS);
175             } catch (final TimeoutException e) {
176                 LOG.warn("Future #{}/{} not completed yet", offset, size);
177             } catch (final ExecutionException e) {
178                 LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
179             } catch (final InterruptedException e) {
180                 LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
181             }
182
183             ++offset;
184         }
185
186         state = State.FAILED;
187         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
188     }
189
190     private boolean checkSuccessful() {
191         if (futures.isEmpty()) {
192             LOG.debug("Completed waiting for all futures");
193             state = State.SUCCESSFUL;
194             scheduledFuture.cancel(false);
195             executor.shutdown();
196             runSuccessful(txCounter);
197             return true;
198         }
199
200         return false;
201     }
202
203     abstract ListenableFuture<Void> execWrite(final long txId);
204
205     abstract void runFailed(Throwable cause);
206
207     abstract void runSuccessful(long allTx);
208
209     abstract void runTimedOut(Exception cause);
210 }