Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions example/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@
end

conn.on(:goaway) do
Thread.start do
sleep(1)
sock.close
end
sock.close
end

conn.on(:stream) do |stream|
Expand Down
4 changes: 1 addition & 3 deletions lib/http/2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def initialize(settings = {})
@streams_recently_closed = {}
@pending_settings = []

@framer = Framer.new(@local_settings[:settings_max_frame_size])
@framer = Framer.new(@streams, @local_settings[:settings_max_frame_size])

@local_window_limit = @local_settings[:settings_initial_window_size]
@local_window = @local_window_limit
Expand Down Expand Up @@ -527,8 +527,6 @@ def connection_management(frame)
when :goaway
# 6.8. GOAWAY
# An endpoint MAY send multiple GOAWAY frames if circumstances change.
when :ping
ping_management(frame)
else
connection_error if (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @closed_since) > 15
end
Expand Down
43 changes: 39 additions & 4 deletions lib/http/2/framer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ class Framer

# Initializes new framer object.
#
def initialize(local_max_frame_size = DEFAULT_MAX_FRAME_SIZE,
remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE)
def initialize(
streams = {},
local_max_frame_size = DEFAULT_MAX_FRAME_SIZE,
remote_max_frame_size = DEFAULT_MAX_FRAME_SIZE
)
@streams = streams
@local_max_frame_size = local_max_frame_size
@remote_max_frame_size = remote_max_frame_size
@frames = []
end

# Generates common 9-byte frame header.
Expand Down Expand Up @@ -371,6 +376,38 @@ def generate(frame)
#
# @param buf [Buffer]
def parse(buf)
decoded = false

while (frame = decode_frame(buf))
if frame[:type] == :ping
# PING responses SHOULD be given higher priority than any other frame.
@frames.unshift(frame)
else
@frames << frame
end
decoded = true
end

# TODO: support stream prioritization
# WIP
if decoded
@frames.sort! do |f1, f2|
next(0) unless f1.key?(:stream) && f2.key?(:stream)

s1 = @streams[f1[:stream]] or next(0)

s2 = @streams[f2[:stream]] or next(0)

s1 <=> s2
end
end

@frames.shift
end

private

def decode_frame(buf)
return if buf.size < 9

frame = read_common_header(buf)
Expand Down Expand Up @@ -491,8 +528,6 @@ def parse(buf)
frame
end

private

def pack_error(error, buffer:)
unless error.is_a? Integer
error = DEFINED_ERRORS[error]
Expand Down
33 changes: 32 additions & 1 deletion lib/http/2/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ class Stream

# Stream priority as set by initiator.
attr_reader :weight
attr_reader :dependency, :remote_window

# whether the stream is exclusive in the dependency tree
attr_reader :exclusive

# the parent stream
attr_reader :dependency

attr_reader :remote_window

# Size of current stream flow control window.
attr_reader :local_window
Expand Down Expand Up @@ -82,6 +89,7 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa
@id = id
@weight = weight
@dependency = dependency
@exclusive = exclusive

# from mixins
@listeners = Hash.new { |hash, key| hash[key] = [] }
Expand All @@ -104,6 +112,28 @@ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, pa
on(:local_window) { |v| @local_window_max_size = @local_window = v }
end

def <=>(other)
if !@dependency.zero?
if @dependency == other.id
# parent stream processed before
return 1
elsif @dependency == other.dependency
if @exclusive
# exclusive streams from the same dep come first
return -1
elsif other.exclusive
return 1
else
return other.weight <=> @weight
end
end
elsif !other.dependency.zero?
return -1 if @id == other.dependency
end

other.weight <=> @weight
end

def closed?
@state == :closed
end
Expand Down Expand Up @@ -647,6 +677,7 @@ def complete_transition(frame)
def process_priority(frame)
@weight = frame[:weight]
@dependency = frame[:dependency]
@exclusive = frame[:exclusive]
emit(
:priority,
weight: frame[:weight],
Expand Down
8 changes: 6 additions & 2 deletions sig/framer.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ module HTTP2

@local_max_frame_size: Integer
@remote_max_frame_size: Integer
@streams: Hash[Integer, Stream]
@frames: Array[frame]

attr_accessor local_max_frame_size: Integer

Expand All @@ -47,11 +49,13 @@ module HTTP2

def generate: (frame) -> String

def parse: (String) -> frame?
def parse: (String buf) -> frame?

private

def initialize: (?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped
def initialize: (?Hash[Integer, Stream] streams, ?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped

def decode_frame: (String buf) -> frame?

def pack_error: (Integer | Symbol error, buffer: String) -> String

Expand Down
3 changes: 3 additions & 0 deletions sig/stream.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module HTTP2
attr_reader parent: Stream?
attr_reader weight: Integer
attr_reader dependency: Integer
attr_reader exclusive: bool
attr_reader remote_window: Integer
attr_reader local_window: Integer
attr_reader closed: Symbol?
Expand All @@ -33,6 +34,8 @@ module HTTP2

alias << receive

def <=>: (Stream other) -> Integer

def verify_trailers: (headers_frame frame) -> void

def calculate_content_length: (Integer?) -> void
Expand Down
10 changes: 10 additions & 0 deletions spec/shared_examples/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@
expect(pong).to eq "12345678"
end

it "should not fire callback on PONG if connection is closed" do
conn << f.generate(settings_frame)
conn << f.generate(goaway_frame)

pong = nil
conn.ping("12345678") { |d| pong = d }
conn << f.generate(pong_frame)
expect(pong).to be_nil
end

it "should fire callback on receipt of GOAWAY" do
last_stream, payload, error = nil
conn << f.generate(settings_frame)
Expand Down
51 changes: 51 additions & 0 deletions spec/stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,57 @@
end
end

context "prioritization" do
it "should order based on dependency" do
# 5.3.1
# if streams B and C are dependent on stream A, and if stream
# D is created with a dependency on stream A, this results in a
# dependency order of A followed by B, C, and D in any order.
a_stream = client.new_stream
b_stream = client.new_stream(dependency: a_stream.id)
c_stream = client.new_stream(dependency: a_stream.id)
d_stream = client.new_stream(dependency: a_stream.id)

expect([b_stream, c_stream, d_stream, a_stream].sort).to eq(
[a_stream, b_stream, c_stream, d_stream]
)
expect([b_stream, d_stream, c_stream, a_stream].sort).to eq(
[a_stream, b_stream, d_stream, c_stream]
)
end

it "should push exclusive streams up the stack" do
# he exclusive flag causes the stream to become the
# sole dependency of its parent stream, causing other dependencies to
# become dependent on the exclusive stream. In the previous example,
# if stream D is created with an exclusive dependency on stream A, this
# results in D becoming the dependency parent of B and C.
a_stream = client.new_stream
b_stream = client.new_stream(dependency: a_stream.id)
c_stream = client.new_stream(dependency: a_stream.id)
d_stream = client.new_stream(dependency: a_stream.id, exclusive: true)

expect([b_stream, c_stream, d_stream, a_stream].sort).to eq(
[a_stream, d_stream, b_stream, c_stream]
)
end

it "should prioritze based on weight" do
# Streams with the same parent SHOULD be allocated resources
# proportionally based on their weight. Thus, if stream B depends on
# stream A with weight 4, stream C depends on stream A with weight 12,
# and no progress can be made on stream A, stream B ideally receives
# one-third of the resources allocated to stream C.
a_stream = client.new_stream
b_stream = client.new_stream(dependency: a_stream.id, weight: 4)
c_stream = client.new_stream(dependency: a_stream.id, weight: 12)

expect([b_stream, c_stream, a_stream].sort).to eq(
[a_stream, c_stream, b_stream]
)
end
end

context "client API" do
it ".reprioritize should emit PRIORITY frame" do
expect(stream).to receive(:send) do |frame|
Expand Down
Loading