332f318de45fc677b39d00ac9245689361de5051
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractStackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.Channel;
13 import java.util.ArrayList;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
17 import javax.annotation.Nonnull;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 abstract class AbstractStackedOutboundQueue implements OutboundQueue {
26
27     private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
28
29     protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater
30             .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
31
32     @GuardedBy("unflushedSegments")
33     protected volatile StackedSegment firstSegment;
34     @GuardedBy("unflushedSegments")
35     protected final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
36     @GuardedBy("unflushedSegments")
37     protected final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
38
39     private volatile long lastXid = -1;
40
41     @GuardedBy("unflushedSegments")
42     protected Integer shutdownOffset;
43
44     // Accessed from Netty only
45     protected int flushOffset;
46
47     protected final AbstractOutboundQueueManager<?, ?> manager;
48
49     AbstractStackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
50         this.manager = Preconditions.checkNotNull(manager);
51         firstSegment = StackedSegment.create(0L);
52         uncompletedSegments.add(firstSegment);
53         unflushedSegments.add(firstSegment);
54     }
55
56     /**
57      * Write some entries from the queue to the channel. Guaranteed to run
58      * in the corresponding EventLoop.
59      *
60      * @param channel Channel onto which we are writing
61      * @param now
62      * @return Number of entries written out
63      */
64     abstract int writeEntries(@Nonnull final Channel channel, final long now);
65
66     abstract boolean pairRequest(final OfHeader message);
67
68     boolean needsFlush() {
69         // flushOffset always points to the first entry, which can be changed only
70         // from Netty, so we are fine here.
71         if (firstSegment.getBaseXid() + flushOffset > lastXid) {
72             return false;
73         }
74
75         if (shutdownOffset != null && flushOffset >= shutdownOffset) {
76             return false;
77         }
78
79         return firstSegment.getEntry(flushOffset).isCommitted();
80     }
81
82     long startShutdown(final Channel channel) {
83         /*
84          * We are dealing with a multi-threaded shutdown, as the user may still
85          * be reserving entries in the queue. We are executing in a netty thread,
86          * so neither flush nor barrier can be running, which is good news.
87          * We will eat up all the slots in the queue here and mark the offset first
88          * reserved offset and free up all the cached queues. We then schedule
89          * the flush task, which will deal with the rest of the shutdown process.
90          */
91         synchronized (unflushedSegments) {
92             // Increment the offset by the segment size, preventing fast path allocations,
93             // since we are holding the slow path lock, any reservations will see the queue
94             // in shutdown and fail accordingly.
95             final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
96             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
97
98             return lockedShutdownFlush();
99         }
100     }
101
102     boolean finishShutdown() {
103         synchronized (unflushedSegments) {
104             lockedShutdownFlush();
105         }
106
107         return !needsFlush();
108     }
109
110     @GuardedBy("unflushedSegments")
111     private long lockedShutdownFlush() {
112         long entries = 0;
113
114         // Fail all queues
115         final Iterator<StackedSegment> it = uncompletedSegments.iterator();
116         while (it.hasNext()) {
117             final StackedSegment segment = it.next();
118
119             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
120             if (segment.isComplete()) {
121                 LOG.trace("Cleared segment {}", segment);
122                 it.remove();
123             }
124         }
125
126         return entries;
127     }
128 }