f23e7ec194b77e02c205def4e54cca003ffb27ba
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.Queue;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 abstract class AbstractTransactionHandler {
34     private abstract static class Phase {
35         abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
36
37         abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
38     }
39
40     private static final class Running extends Phase {
41         private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
42         private Throwable failure;
43
44         void addFuture(final ListenableFuture<Void> execFuture) {
45             futures.add(execFuture);
46         }
47
48         @Override
49         void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
50             futures.remove(execFuture);
51         }
52
53         @Override
54         void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
55             futures.remove(execFuture);
56             if (failure != null) {
57                 failure = cause;
58             }
59         }
60
61         Optional<Throwable> getFailure() {
62             return Optional.ofNullable(failure);
63         }
64     }
65
66     private final class Collecting extends Phase {
67         private final List<ListenableFuture<Void>> futures;
68         private boolean done;
69
70         Collecting(final Collection<ListenableFuture<Void>> futures) {
71             this.futures = new ArrayList<>(futures);
72         }
73
74         @Override
75         void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
76             futures.remove(execFuture);
77             if (futures.isEmpty() && !done) {
78                 LOG.debug("All futures completed successfully.");
79                 runSuccessful(txCounter);
80             }
81         }
82
83         @Override
84         void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
85             futures.remove(execFuture);
86             done = true;
87             runFailed(cause);
88         }
89     }
90
91     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
92
93     static final int SECOND_AS_NANO = 1000000000;
94     //2^20 as in the model
95     static final int MAX_ITEM = 1048576;
96
97     static final QName ID_INTS =
98             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
99     static final QName ID =
100             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
101     static final QName ITEM =
102             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
103     static final QName NUMBER =
104             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
105
106     public static final QName ID_INT =
107             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
108     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
109     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
110
111     static final long INIT_TX_TIMEOUT_SECONDS = 125;
112
113     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
114
115     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
116     private final Stopwatch stopwatch = Stopwatch.createStarted();
117     private final long runtimeNanos;
118     private final long delayNanos;
119
120     private ScheduledFuture<?> scheduledFuture;
121     private long txCounter;
122     @GuardedBy("this")
123     private Phase phase;
124
125     AbstractTransactionHandler(final TransactionsParams params) {
126         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
127         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
128     }
129
130     final synchronized void doStart() {
131         phase = new Running();
132         scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
133     }
134
135     private void execute() {
136         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
137         if (elapsed < runtimeNanos) {
138             // Not completed yet: create a transaction and hook it up
139             final long txId = txCounter++;
140             final ListenableFuture<Void> execFuture = execWrite(txId);
141
142             // Ordering is important: we need to add the future before hooking the callback
143             synchronized (this) {
144                 ((Running) phase).addFuture(execFuture);
145             }
146             Futures.addCallback(execFuture, new FutureCallback<Void>() {
147                 @Override
148                 public void onSuccess(final Void result) {
149                     txSuccess(execFuture, txId);
150                 }
151
152                 @Override
153                 public void onFailure(final Throwable cause) {
154                     txFailure(execFuture, txId, cause);
155                 }
156             });
157         } else {
158             startCollection();
159         }
160     }
161
162     private synchronized void startCollection() {
163         scheduledFuture.cancel(false);
164
165         final Running running = (Running) phase;
166         final Optional<Throwable> failure = running.getFailure();
167         if (failure.isPresent()) {
168             executor.shutdown();
169             runFailed(failure.get());
170             return;
171         }
172
173         LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
174         if (running.futures.isEmpty()) {
175             executor.shutdown();
176             runSuccessful(txCounter);
177             return;
178         }
179
180         phase = new Collecting(running.futures);
181         executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
182         executor.shutdown();
183     }
184
185     final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
186         LOG.debug("Future #{} completed successfully", txId);
187         phase.txSuccess(execFuture, txId);
188     }
189
190     final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
191         LOG.debug("Future #{} failed", txId, cause);
192         phase.txFailure(execFuture, txId, cause);
193     }
194
195     private synchronized void checkCollection() {
196         final Collecting collecting = (Collecting) phase;
197         if (!collecting.done) {
198             final int size = collecting.futures.size();
199             for (int i = 0; i < size; i++) {
200                 final ListenableFuture<Void> future = collecting.futures.get(i);
201
202                 try {
203                     future.get(0, TimeUnit.NANOSECONDS);
204                 } catch (final TimeoutException e) {
205                     LOG.warn("Future #{}/{} not completed yet", i, size);
206                 } catch (final ExecutionException e) {
207                     LOG.warn("Future #{}/{} failed", i, size, e.getCause());
208                 } catch (final InterruptedException e) {
209                     LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
210                 }
211             }
212
213             runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
214         }
215     }
216
217     abstract ListenableFuture<Void> execWrite(final long txId);
218
219     abstract void runFailed(Throwable cause);
220
221     abstract void runSuccessful(long allTx);
222
223     abstract void runTimedOut(Exception cause);
224 }