2 * Copyright (c) 2016 Dell Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.ipv6service.utils;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.function.Consumer;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 public class Ipv6PeriodicTrQueue implements AutoCloseable {
23 private static final Logger LOG = LoggerFactory.getLogger(Ipv6PeriodicTrQueue.class);
25 private final Consumer<Uuid> onMessage;
26 private final ConcurrentLinkedQueue<Uuid> ipv6PeriodicQueue = new ConcurrentLinkedQueue<>();
27 private final Thread transmitterThread = new Thread(this::threadRunLoop);
28 private final ReentrantLock queueLock = new ReentrantLock();
29 private final Condition queueCondition = queueLock.newCondition();
30 private volatile boolean closed;
32 @GuardedBy("queueLock")
33 private boolean isMessageAvailable;
35 public Ipv6PeriodicTrQueue(Consumer<Uuid> onMessage) {
36 this.onMessage = onMessage;
41 transmitterThread.start();
43 LOG.info("Started the ipv6 periodic RA transmission thread");
51 queueCondition.signalAll();
57 public void addMessage(Uuid portId) {
58 ipv6PeriodicQueue.add(portId);
62 isMessageAvailable = true;
63 queueCondition.signalAll();
69 // Suppress "Exceptional return value of java.util.concurrent.locks.Condition.await" - we really don't care
70 // if the Condition was signaled or timed out as we use isMessageAvailable to break or continue waiting.
71 @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
72 private void threadRunLoop() {
74 while (!ipv6PeriodicQueue.isEmpty()) {
75 Uuid portId = ipv6PeriodicQueue.poll();
76 LOG.debug("timeout got for port {}", portId);
77 onMessage.accept(portId);
82 while (!isMessageAvailable && !closed) {
83 queueCondition.await(1, TimeUnit.SECONDS);
85 isMessageAvailable = false;
86 } catch (InterruptedException e) {
87 LOG.debug("threadRunLoop interrupted", e);