From 4c398e69ab1ea5523ccc7b73abdfd19a8e6bc3f2 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Tue, 14 Oct 2025 09:32:52 +0100 Subject: [PATCH 1/4] ignore ping frame when connection is closed while it's valid to process ping frames after sending/receiving goaway frames, preparing an AC frame may inadvertedly put bytes on the user buffer, thereby signaling that there's something to write back instead of terminating the connection --- lib/http/2/connection.rb | 2 -- spec/shared_examples/connection.rb | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/http/2/connection.rb b/lib/http/2/connection.rb index d3cf180..1b97d3e 100644 --- a/lib/http/2/connection.rb +++ b/lib/http/2/connection.rb @@ -527,8 +527,6 @@ def connection_management(frame) when :goaway # 6.8. GOAWAY # An endpoint MAY send multiple GOAWAY frames if circumstances change. - when :ping - ping_management(frame) else connection_error if (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @closed_since) > 15 end diff --git a/spec/shared_examples/connection.rb b/spec/shared_examples/connection.rb index a2e9f20..b43683f 100644 --- a/spec/shared_examples/connection.rb +++ b/spec/shared_examples/connection.rb @@ -135,6 +135,16 @@ expect(pong).to eq "12345678" end + it "should not fire callback on PONG if connection is closed" do + conn << f.generate(settings_frame) + conn << f.generate(goaway_frame) + + pong = nil + conn.ping("12345678") { |d| pong = d } + conn << f.generate(pong_frame) + expect(pong).to be_nil + end + it "should fire callback on receipt of GOAWAY" do last_stream, payload, error = nil conn << f.generate(settings_frame) From 071b5755bc79009cccec94edf8bab649209e6b9a Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Tue, 14 Oct 2025 10:37:20 +0100 Subject: [PATCH 2/4] example server: do not delay closing socket on goaway frame this causes the truffleruby build to fail, as it expects to either receive a ping ack or have the connection closed --- example/server.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/example/server.rb b/example/server.rb index cb97d83..555326a 100644 --- a/example/server.rb +++ b/example/server.rb @@ -62,10 +62,7 @@ end conn.on(:goaway) do - Thread.start do - sleep(1) - sock.close - end + sock.close end conn.on(:stream) do |stream| From e197bbbdcb8f03e07fe6823c8973fffa53ec659d Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Wed, 10 Dec 2025 00:17:45 +0000 Subject: [PATCH 3/4] eager-decode-then reorder received frames the immediate benefit of eager-decoding frames is that one can push PING frames to the top of the stack, thereby prioritizing its processing. --- lib/http/2/framer.rb | 18 ++++++++++++++++-- sig/framer.rbs | 5 ++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/http/2/framer.rb b/lib/http/2/framer.rb index b09c118..6ab0c11 100644 --- a/lib/http/2/framer.rb +++ b/lib/http/2/framer.rb @@ -119,6 +119,7 @@ def initialize(local_max_frame_size = DEFAULT_MAX_FRAME_SIZE, remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE) @local_max_frame_size = local_max_frame_size @remote_max_frame_size = remote_max_frame_size + @frames = [] end # Generates common 9-byte frame header. @@ -371,6 +372,21 @@ def generate(frame) # # @param buf [Buffer] def parse(buf) + while (frame = decode_frame(buf)) + if frame[:type] == :ping + # PING responses SHOULD be given higher priority than any other frame. + @frames.unshift(frame) + else + @frames << frame + end + end + + @frames.shift + end + + private + + def decode_frame(buf) return if buf.size < 9 frame = read_common_header(buf) @@ -491,8 +507,6 @@ def parse(buf) frame end - private - def pack_error(error, buffer:) unless error.is_a? Integer error = DEFINED_ERRORS[error] diff --git a/sig/framer.rbs b/sig/framer.rbs index 5205990..ffc66b2 100644 --- a/sig/framer.rbs +++ b/sig/framer.rbs @@ -34,6 +34,7 @@ module HTTP2 @local_max_frame_size: Integer @remote_max_frame_size: Integer + @streams: Hash[Integer, Stream] attr_accessor local_max_frame_size: Integer @@ -47,12 +48,14 @@ module HTTP2 def generate: (frame) -> String - def parse: (String) -> frame? + def parse: (String buf) -> frame? private def initialize: (?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped + def decode_frame: (String buf) -> frame? + def pack_error: (Integer | Symbol error, buffer: String) -> String def unpack_error: (Integer) -> (Symbol | Integer) From 5406ed4729a7f8bfdd571b22c373de0f75667e81 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Wed, 10 Dec 2025 00:22:42 +0000 Subject: [PATCH 4/4] reorder decoded frames according to stream priorization rules --- lib/http/2/connection.rb | 2 +- lib/http/2/framer.rb | 25 ++++++++++++++++++-- lib/http/2/stream.rb | 33 +++++++++++++++++++++++++- sig/framer.rbs | 3 ++- sig/stream.rbs | 3 +++ spec/stream_spec.rb | 51 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 112 insertions(+), 5 deletions(-) diff --git a/lib/http/2/connection.rb b/lib/http/2/connection.rb index 1b97d3e..b06372b 100644 --- a/lib/http/2/connection.rb +++ b/lib/http/2/connection.rb @@ -93,7 +93,7 @@ def initialize(settings = {}) @streams_recently_closed = {} @pending_settings = [] - @framer = Framer.new(@local_settings[:settings_max_frame_size]) + @framer = Framer.new(@streams, @local_settings[:settings_max_frame_size]) @local_window_limit = @local_settings[:settings_initial_window_size] @local_window = @local_window_limit diff --git a/lib/http/2/framer.rb b/lib/http/2/framer.rb index 6ab0c11..0a426f6 100644 --- a/lib/http/2/framer.rb +++ b/lib/http/2/framer.rb @@ -115,8 +115,12 @@ class Framer # Initializes new framer object. # - def initialize(local_max_frame_size = DEFAULT_MAX_FRAME_SIZE, - remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE) + def initialize( + streams = {}, + local_max_frame_size = DEFAULT_MAX_FRAME_SIZE, + remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE + ) + @streams = streams @local_max_frame_size = local_max_frame_size @remote_max_frame_size = remote_max_frame_size @frames = [] @@ -372,6 +376,8 @@ def generate(frame) # # @param buf [Buffer] def parse(buf) + decoded = false + while (frame = decode_frame(buf)) if frame[:type] == :ping # PING responses SHOULD be given higher priority than any other frame. @@ -379,6 +385,21 @@ def parse(buf) else @frames << frame end + decoded = true + end + + # TODO: support stream prioritization + # WIP + if decoded + @frames.sort! do |f1, f2| + next(0) unless f1.key?(:stream) && f2.key?(:stream) + + s1 = @streams[f1[:stream]] or next(0) + + s2 = @streams[f2[:stream]] or next(0) + + s1 <=> s2 + end end @frames.shift diff --git a/lib/http/2/stream.rb b/lib/http/2/stream.rb index 796f3b5..de9c529 100644 --- a/lib/http/2/stream.rb +++ b/lib/http/2/stream.rb @@ -53,7 +53,14 @@ class Stream # Stream priority as set by initiator. attr_reader :weight - attr_reader :dependency, :remote_window + + # whether the stream is exclusive in the dependency tree + attr_reader :exclusive + + # the parent stream + attr_reader :dependency + + attr_reader :remote_window # Size of current stream flow control window. attr_reader :local_window @@ -82,6 +89,7 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa @id = id @weight = weight @dependency = dependency + @exclusive = exclusive # from mixins @listeners = Hash.new { |hash, key| hash[key] = [] } @@ -104,6 +112,28 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa on(:local_window) { |v| @local_window_max_size = @local_window = v } end + def <=>(other) + if !@dependency.zero? + if @dependency == other.id + # parent stream processed before + return 1 + elsif @dependency == other.dependency + if @exclusive + # exclusive streams from the same dep come first + return -1 + elsif other.exclusive + return 1 + else + return other.weight <=> @weight + end + end + elsif !other.dependency.zero? + return -1 if @id == other.dependency + end + + other.weight <=> @weight + end + def closed? @state == :closed end @@ -647,6 +677,7 @@ def complete_transition(frame) def process_priority(frame) @weight = frame[:weight] @dependency = frame[:dependency] + @exclusive = frame[:exclusive] emit( :priority, weight: frame[:weight], diff --git a/sig/framer.rbs b/sig/framer.rbs index ffc66b2..39d7282 100644 --- a/sig/framer.rbs +++ b/sig/framer.rbs @@ -35,6 +35,7 @@ module HTTP2 @local_max_frame_size: Integer @remote_max_frame_size: Integer @streams: Hash[Integer, Stream] + @frames: Array[frame] attr_accessor local_max_frame_size: Integer @@ -52,7 +53,7 @@ module HTTP2 private - def initialize: (?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped + def initialize: (?Hash[Integer, Stream] streams, ?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped def decode_frame: (String buf) -> frame? diff --git a/sig/stream.rbs b/sig/stream.rbs index 3f1864c..ea30927 100644 --- a/sig/stream.rbs +++ b/sig/stream.rbs @@ -10,6 +10,7 @@ module HTTP2 attr_reader parent: Stream? attr_reader weight: Integer attr_reader dependency: Integer + attr_reader exclusive: bool attr_reader remote_window: Integer attr_reader local_window: Integer attr_reader closed: Symbol? @@ -33,6 +34,8 @@ module HTTP2 alias << receive + def <=>: (Stream other) -> Integer + def verify_trailers: (headers_frame frame) -> void def calculate_content_length: (Integer?) -> void diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index 692d36f..15bd4d9 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -662,6 +662,57 @@ end end + context "prioritization" do + it "should order based on dependency" do + # 5.3.1 + # if streams B and C are dependent on stream A, and if stream + # D is created with a dependency on stream A, this results in a + # dependency order of A followed by B, C, and D in any order. + a_stream = client.new_stream + b_stream = client.new_stream(dependency: a_stream.id) + c_stream = client.new_stream(dependency: a_stream.id) + d_stream = client.new_stream(dependency: a_stream.id) + + expect([b_stream, c_stream, d_stream, a_stream].sort).to eq( + [a_stream, b_stream, c_stream, d_stream] + ) + expect([b_stream, d_stream, c_stream, a_stream].sort).to eq( + [a_stream, b_stream, d_stream, c_stream] + ) + end + + it "should push exclusive streams up the stack" do + # he exclusive flag causes the stream to become the + # sole dependency of its parent stream, causing other dependencies to + # become dependent on the exclusive stream. In the previous example, + # if stream D is created with an exclusive dependency on stream A, this + # results in D becoming the dependency parent of B and C. + a_stream = client.new_stream + b_stream = client.new_stream(dependency: a_stream.id) + c_stream = client.new_stream(dependency: a_stream.id) + d_stream = client.new_stream(dependency: a_stream.id, exclusive: true) + + expect([b_stream, c_stream, d_stream, a_stream].sort).to eq( + [a_stream, d_stream, b_stream, c_stream] + ) + end + + it "should prioritze based on weight" do + # Streams with the same parent SHOULD be allocated resources + # proportionally based on their weight. Thus, if stream B depends on + # stream A with weight 4, stream C depends on stream A with weight 12, + # and no progress can be made on stream A, stream B ideally receives + # one-third of the resources allocated to stream C. + a_stream = client.new_stream + b_stream = client.new_stream(dependency: a_stream.id, weight: 4) + c_stream = client.new_stream(dependency: a_stream.id, weight: 12) + + expect([b_stream, c_stream, a_stream].sort).to eq( + [a_stream, c_stream, b_stream] + ) + end + end + context "client API" do it ".reprioritize should emit PRIORITY frame" do expect(stream).to receive(:send) do |frame|