private CountDownLatch waitingLatch;
private Status retStatus;
private static final Logger logger = LoggerFactory.getLogger(FlowEntryDistributionOrderFutureTask.class);
+ // Don't wait forever to program, rather timeout if there are issues, and
+ // log an error
+ private long timeout;
+ private static final Long DEFAULTTIMEOUT = 30000L;
/**
* @param order
this.waitingLatch = new CountDownLatch(1);
// No return status yet!
this.retStatus = new Status(StatusCode.UNDEFINED);
+ // Set the timeout
+ String strTimeout = System.getProperty("FlowEntryDistributionOrderFutureTask.timeout",
+ DEFAULTTIMEOUT.toString());
+ try {
+ timeout = Long.parseLong(strTimeout);
+ } catch (Exception e) {
+ timeout = DEFAULTTIMEOUT;
+ }
}
@Override
@Override
public Status get() throws InterruptedException, ExecutionException {
+ boolean didFinish = false;
logger.trace("Getting status for order {}", this.order);
// If i'm done lets return the status as many times as caller wants
if (this.waitingLatch.getCount() == 0L) {
logger.trace("Start waiting for status to come back");
// Wait till someone signal that we are done
- this.waitingLatch.await();
+ didFinish = this.waitingLatch.await(this.timeout, TimeUnit.MILLISECONDS);
- logger.trace("Waiting for the status is over, returning it");
- // Return the known status
- return retStatus;
+ if (didFinish) {
+ logger.trace("Waiting for the status of order {} is over, returning it", this.order);
+ // Return the known status
+ return retStatus;
+ } else {
+ logger.error("Timing out, the workStatus for order {} has not come back in time!", this.order);
+ return new Status(StatusCode.TIMEOUT);
+ }
}
@Override
public Status get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
+ boolean didFinish = false;
logger.trace("Getting status for order {}", this.order);
// If i'm done lets return the status as many times as caller wants
if (this.waitingLatch.getCount() == 0L) {
logger.trace("Start waiting for status to come back");
// Wait till someone signal that we are done
- this.waitingLatch.await(timeout, unit);
+ didFinish = this.waitingLatch.await(timeout, unit);
- logger.trace("Waiting for the status is over, returning it");
- // Return the known status, could also be null if didn't return
- return retStatus;
+ if (didFinish) {
+ logger.trace("Waiting for the status is over, returning it");
+ // Return the known status, could also be null if didn't return
+ return retStatus;
+ } else {
+ // No need to bark here as long as this routine could indeed
+ // timeout
+ logger.trace("Timing out, the workStatus for order {} has not come back in time!", this.order);
+ return new Status(StatusCode.TIMEOUT);
+ }
}
@Override
this.waitingLatch.countDown();
logger.trace("Unlocked the Future");
}
+
+ /**
+ * Getter for the workOrder for which the order is waiting for
+ * @return the order
+ */
+ public FlowEntryDistributionOrder getOrder() {
+ return order;
+ }
}
* @return a Future object for monitoring the progress of the result, or
* null in case the processing should take place locally
*/
- private Future<Status> distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u,
+ UpdateType t) {
// A null entry it's an unexpected condition, anyway it's safe to keep
// the handling local
if (e == null) {
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
+ FlowEntryDistributionOrderFutureTask futureStatus =
+ distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
* contain the unique id assigned to this request
*/
private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
*/
if (fe.getRequestorController()
.equals(clusterContainerService.getMyAddress())) {
- FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe);
+ FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe);
if (fet != null) {
logsync.trace("workStatus response is for us {}", fe);
// Signal we got the status