7dd329196efdfa7758698e776ae0dc6f196ec2fa
[netvirt.git] / ipv6service / impl / src / main / java / org / opendaylight / netvirt / ipv6service / utils / Ipv6PeriodicTrQueue.java
1 /*
2  * Copyright (c) 2016 Dell Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.ipv6service.utils;
10
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.function.Consumer;
17 import org.checkerframework.checker.lock.qual.GuardedBy;
18 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 public class Ipv6PeriodicTrQueue implements AutoCloseable {
23     private static final Logger LOG = LoggerFactory.getLogger(Ipv6PeriodicTrQueue.class);
24
25     private final Consumer<Uuid> onMessage;
26     private final ConcurrentLinkedQueue<Uuid> ipv6PeriodicQueue = new ConcurrentLinkedQueue<>();
27     private final Thread transmitterThread = new Thread(this::threadRunLoop);
28     private final ReentrantLock queueLock = new ReentrantLock();
29     private final Condition queueCondition = queueLock.newCondition();
30     private volatile boolean closed;
31
32     @GuardedBy("queueLock")
33     private boolean isMessageAvailable;
34
35     public Ipv6PeriodicTrQueue(Consumer<Uuid> onMessage) {
36         this.onMessage = onMessage;
37         init();
38     }
39
40     public void init() {
41         transmitterThread.start();
42
43         LOG.info("Started the ipv6 periodic RA transmission thread");
44     }
45
46     @Override
47     public void close() {
48         queueLock.lock();
49         try {
50             closed = true;
51             queueCondition.signalAll();
52         } finally {
53             queueLock.unlock();
54         }
55     }
56
57     public void addMessage(Uuid portId) {
58         ipv6PeriodicQueue.add(portId);
59
60         queueLock.lock();
61         try {
62             isMessageAvailable = true;
63             queueCondition.signalAll();
64         } finally {
65             queueLock.unlock();
66         }
67     }
68
69     // Suppress "Exceptional return value of java.util.concurrent.locks.Condition.await" - we really don't care
70     // if the Condition was signaled or timed out as we use isMessageAvailable to break or continue waiting.
71     @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
72     private void threadRunLoop() {
73         while (!closed) {
74             while (!ipv6PeriodicQueue.isEmpty()) {
75                 Uuid portId = ipv6PeriodicQueue.poll();
76                 LOG.debug("timeout got for port {}", portId);
77                 onMessage.accept(portId);
78             }
79
80             queueLock.lock();
81             try {
82                 while (!isMessageAvailable && !closed) {
83                     queueCondition.await(1, TimeUnit.SECONDS);
84                 }
85                 isMessageAvailable = false;
86             } catch (InterruptedException e) {
87                 LOG.debug("threadRunLoop interrupted", e);
88             } finally {
89                 queueLock.unlock();
90             }
91         }
92     }
93 }