2 * Copyright (c) 2016 Dell Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.ipv6service.utils;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.function.Consumer;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 public class Ipv6PeriodicTrQueue implements AutoCloseable {
24 private static final Logger LOG = LoggerFactory.getLogger(Ipv6PeriodicTrQueue.class);
26 private final Consumer<Uuid> onMessage;
27 private final ConcurrentLinkedQueue<Uuid> ipv6PeriodicQueue = new ConcurrentLinkedQueue<>();
28 private final Thread transmitterThread = new Thread(this::threadRunLoop);
29 private final ReentrantLock queueLock = new ReentrantLock();
30 private final Condition queueCondition = queueLock.newCondition();
31 private volatile boolean closed;
33 @GuardedBy("queueLock")
34 private boolean isMessageAvailable;
36 public Ipv6PeriodicTrQueue(Consumer<Uuid> onMessage) {
37 this.onMessage = onMessage;
42 transmitterThread.start();
44 LOG.info("Started the ipv6 periodic RA transmission thread");
52 queueCondition.signalAll();
58 public void addMessage(Uuid portId) {
59 ipv6PeriodicQueue.add(portId);
63 isMessageAvailable = true;
64 queueCondition.signalAll();
70 // Suppress "Exceptional return value of java.util.concurrent.locks.Condition.await" - we really don't care
71 // if the Condition was signaled or timed out as we use isMessageAvailable to break or continue waiting.
72 @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
73 private void threadRunLoop() {
75 while (!ipv6PeriodicQueue.isEmpty()) {
76 Uuid portId = ipv6PeriodicQueue.poll();
77 LOG.debug("timeout got for port {}", portId);
78 onMessage.accept(portId);
83 while (!isMessageAvailable && !closed) {
84 queueCondition.await(1, TimeUnit.SECONDS);
86 isMessageAvailable = false;
87 } catch (InterruptedException e) {
88 LOG.debug("threadRunLoop interrupted", e);