BUG-8618: fix test driver
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Collection;
15 import java.util.HashSet;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.ScheduledFuture;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
22 import org.opendaylight.yangtools.yang.common.QName;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 abstract class AbstractTransactionHandler {
28     private enum State {
29         RUNNING,
30         WAITING,
31         SUCCESSFUL,
32         FAILED,
33     }
34
35     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
36
37     static final int SECOND_AS_NANO = 1_000_000_000;
38     //2^20 as in the model
39     static final int MAX_ITEM = 1048576;
40
41     static final QName ID_INTS =
42             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
43     static final QName ID =
44             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
45     static final QName ITEM =
46             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
47     static final QName NUMBER =
48             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
49
50     public static final QName ID_INT =
51             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
52     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
53     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
54
55     static final long INIT_TX_TIMEOUT_SECONDS = 125;
56
57     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
58
59     private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
60     private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
61     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
62     private final long runtimeNanos;
63     private final long delayNanos;
64
65     private ScheduledFuture<?> scheduledFuture;
66     private long txCounter;
67     private State state;
68
69     AbstractTransactionHandler(final TransactionsParams params) {
70         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
71         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
72     }
73
74     final synchronized void doStart() {
75         scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
76         stopwatch.start();
77         state = State.RUNNING;
78     }
79
80     private synchronized void execute() {
81         switch (state) {
82             case FAILED:
83                 // This could happen due to scheduling artifacts
84                 break;
85             case RUNNING:
86                 runningExecute();
87                 break;
88             default:
89                 throw new IllegalStateException("Unhandled state " + state);
90         }
91     }
92
93     private void runningExecute() {
94         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
95         if (elapsed >= runtimeNanos) {
96             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
97             if (!checkSuccessful()) {
98                 state = State.WAITING;
99                 scheduledFuture.cancel(false);
100                 scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
101             }
102
103             return;
104         }
105
106         // Not completed yet: create a transaction and hook it up
107         final long txId = txCounter++;
108         final ListenableFuture<Void> execFuture = execWrite(txId);
109         LOG.debug("New future #{} allocated", txId);
110
111         // Ordering is important: we need to add the future before hooking the callback
112         futures.add(execFuture);
113         Futures.addCallback(execFuture, new FutureCallback<Void>() {
114             @Override
115             public void onSuccess(final Void result) {
116                 txSuccess(execFuture, txId);
117             }
118
119             @Override
120             public void onFailure(final Throwable cause) {
121                 txFailure(execFuture, txId, cause);
122             }
123         }, executor);
124     }
125
126     final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
127         LOG.debug("Future #{} completed successfully", txId);
128         futures.remove(execFuture);
129
130         switch (state) {
131             case FAILED:
132             case RUNNING:
133                 // No-op
134                 break;
135             case WAITING:
136                 checkSuccessful();
137                 break;
138             default:
139                 throw new IllegalStateException("Unhandled state " + state);
140         }
141     }
142
143     final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
144         LOG.debug("Future #{} failed", txId, cause);
145         futures.remove(execFuture);
146
147         switch (state) {
148             case FAILED:
149                 // no-op
150                 break;
151             case RUNNING:
152             case WAITING:
153                 state = State.FAILED;
154                 scheduledFuture.cancel(false);
155                 runFailed(cause);
156                 break;
157             default:
158                 throw new IllegalStateException("Unhandled state " + state);
159         }
160     }
161
162     private void checkComplete() {
163         final int size = futures.size();
164         if (size == 0) {
165             return;
166         }
167
168         int offset = 0;
169         for (ListenableFuture<Void> future : futures) {
170             try {
171                 future.get(0, TimeUnit.NANOSECONDS);
172             } catch (final TimeoutException e) {
173                 LOG.warn("Future #{}/{} not completed yet", offset, size);
174             } catch (final ExecutionException e) {
175                 LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
176             } catch (final InterruptedException e) {
177                 LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
178             }
179
180             ++offset;
181         }
182
183         state = State.FAILED;
184         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
185     }
186
187     private boolean checkSuccessful() {
188         if (futures.isEmpty()) {
189             LOG.debug("Completed waiting for all futures");
190             state = State.SUCCESSFUL;
191             scheduledFuture.cancel(false);
192             runSuccessful(txCounter);
193             return true;
194         }
195
196         return false;
197     }
198
199     abstract ListenableFuture<Void> execWrite(final long txId);
200
201     abstract void runFailed(Throwable cause);
202
203     abstract void runSuccessful(long allTx);
204
205     abstract void runTimedOut(Exception cause);
206 }