// Copyright 2025 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package http2 import ( "fmt" "math" ) type streamMetadata struct { location *writeQueue priority PriorityParam } type priorityWriteSchedulerRFC9218 struct { // control contains control frames (SETTINGS, PING, etc.). control writeQueue // heads contain the head of a circular list of streams. // We put these heads within a nested array that represents urgency and // incremental, as defined in // https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters. // 8 represents u=0 up to u=7, and 2 represents i=false and i=true. heads [8][2]*writeQueue // streams contains a mapping between each stream ID and their metadata, so // we can quickly locate them when needing to, for example, adjust their // priority. streams map[uint32]streamMetadata // queuePool are empty queues for reuse. queuePool writeQueuePool // prioritizeIncremental is used to determine whether we should prioritize // incremental streams or not, when urgency is the same in a given Pop() // call. prioritizeIncremental bool // priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we // receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame. priorityUpdateBuf struct { // streamID being 0 means that the buffer is empty. This is a safe // assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR. streamID uint32 priority PriorityParam } } func newPriorityWriteSchedulerRFC9218() WriteScheduler { ws := &priorityWriteSchedulerRFC9218{ streams: make(map[uint32]streamMetadata), } return ws } func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) { if ws.streams[streamID].location != nil { panic(fmt.Errorf("stream %d already opened", streamID)) } if streamID == ws.priorityUpdateBuf.streamID { ws.priorityUpdateBuf.streamID = 0 opt.priority = ws.priorityUpdateBuf.priority } q := ws.queuePool.get() ws.streams[streamID] = streamMetadata{ location: q, priority: opt.priority, } u, i := opt.priority.urgency, opt.priority.incremental if ws.heads[u][i] == nil { ws.heads[u][i] = q q.next = q q.prev = q } else { // Queues are stored in a ring. // Insert the new stream before ws.head, putting it at the end of the list. q.prev = ws.heads[u][i].prev q.next = ws.heads[u][i] q.prev.next = q q.next.prev = q } } func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) { metadata := ws.streams[streamID] q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental if q == nil { return } if q.next == q { // This was the only open stream. ws.heads[u][i] = nil } else { q.prev.next = q.next q.next.prev = q.prev if ws.heads[u][i] == q { ws.heads[u][i] = q.next } } delete(ws.streams, streamID) ws.queuePool.put(q) } func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) { metadata := ws.streams[streamID] q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental if q == nil { ws.priorityUpdateBuf.streamID = streamID ws.priorityUpdateBuf.priority = priority return } // Remove stream from current location. if q.next == q { // This was the only open stream. ws.heads[u][i] = nil } else { q.prev.next = q.next q.next.prev = q.prev if ws.heads[u][i] == q { ws.heads[u][i] = q.next } } // Insert stream to the new queue. u, i = priority.urgency, priority.incremental if ws.heads[u][i] == nil { ws.heads[u][i] = q q.next = q q.prev = q } else { // Queues are stored in a ring. // Insert the new stream before ws.head, putting it at the end of the list. q.prev = ws.heads[u][i].prev q.next = ws.heads[u][i] q.prev.next = q q.next.prev = q } // Update the metadata. ws.streams[streamID] = streamMetadata{ location: q, priority: priority, } } func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) { if wr.isControl() { ws.control.push(wr) return } q := ws.streams[wr.StreamID()].location if q == nil { // This is a closed stream. // wr should not be a HEADERS or DATA frame. // We push the request onto the control queue. if wr.DataSize() > 0 { panic("add DATA on non-open stream") } ws.control.push(wr) return } q.push(wr) } func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) { // Control and RST_STREAM frames first. if !ws.control.empty() { return ws.control.shift(), true } // On the next Pop(), we want to prioritize incremental if we prioritized // non-incremental request of the same urgency this time. Vice-versa. // i.e. when there are incremental and non-incremental requests at the same // priority, we give 50% of our bandwidth to the incremental ones in // aggregate and 50% to the first non-incremental one (since // non-incremental streams do not use round-robin writes). ws.prioritizeIncremental = !ws.prioritizeIncremental // Always prioritize lowest u (i.e. highest urgency level). for u := range ws.heads { for i := range ws.heads[u] { // When we want to prioritize incremental, we try to pop i=true // first before i=false when u is the same. if ws.prioritizeIncremental { i = (i + 1) % 2 } q := ws.heads[u][i] if q == nil { continue } for { if wr, ok := q.consume(math.MaxInt32); ok { if i == 1 { // For incremental streams, we update head to q.next so // we can round-robin between multiple streams that can // immediately benefit from partial writes. ws.heads[u][i] = q.next } else { // For non-incremental streams, we try to finish one to // completion rather than doing round-robin. However, // we update head here so that if q.consume() is !ok // (e.g. the stream has no more frame to consume), head // is updated to the next q that has frames to consume // on future iterations. This way, we do not prioritize // writing to unavailable stream on next Pop() calls, // preventing head-of-line blocking. ws.heads[u][i] = q } return wr, true } q = q.next if q == ws.heads[u][i] { break } } } } return FrameWriteRequest{}, false }