Skip to content

Commit ec52beb

Browse files
committed
streams: Prevent RST_STREAM from being sent multiple times
Previously, RST_STREAM would be sent even when the stream closed by state was in SEND_RST_STREAM. This fix instead checks which peer closed the stream initially, and then updates the closed by value from RECV_RST_STREAM to SEND_RST_STREAM after a RST_STREAM frame has been sent on a previously reset stream. Streams that have a closed by value of SEND_RST_STREAM now ignore all frames.
1 parent 6a03e70 commit ec52beb

File tree

3 files changed

+121
-46
lines changed

3 files changed

+121
-46
lines changed

src/h2/connection.py

+61-18
Original file line numberDiff line numberDiff line change
@@ -1486,16 +1486,17 @@ def _receive_frame(self, frame):
14861486
# I don't love using __class__ here, maybe reconsider it.
14871487
frames, events = self._frame_dispatch_table[frame.__class__](frame)
14881488
except StreamClosedError as e:
1489-
# If the stream was closed by RST_STREAM, we just send a RST_STREAM
1490-
# to the remote peer. Otherwise, this is a connection error, and so
1491-
# we will re-raise to trigger one.
1492-
if self._stream_is_closed_by_reset(e.stream_id):
1489+
if e._connection_error:
1490+
raise
1491+
else:
1492+
# A StreamClosedError is raised when a stream wants to send a
1493+
# RST_STREAM frame. Since the H2Stream is the authoritative source
1494+
# of its own state, we always respect its wishes here.
1495+
14931496
f = RstStreamFrame(e.stream_id)
14941497
f.error_code = e.error_code
14951498
self._prepare_for_sending([f])
14961499
events = e._events
1497-
else:
1498-
raise
14991500
except StreamIDTooLowError as e:
15001501
# The stream ID seems invalid. This may happen when the closed
15011502
# stream has been cleaned up, or when the remote peer has opened a
@@ -1506,10 +1507,18 @@ def _receive_frame(self, frame):
15061507
# is either a stream error or a connection error.
15071508
if self._stream_is_closed_by_reset(e.stream_id):
15081509
# Closed by RST_STREAM is a stream error.
1509-
f = RstStreamFrame(e.stream_id)
1510-
f.error_code = ErrorCodes.STREAM_CLOSED
1511-
self._prepare_for_sending([f])
1512-
events = []
1510+
if self._stream_is_closed_by_peer_reset(e.stream_id):
1511+
self._closed_streams[e.stream_id] = StreamClosedBy.SEND_RST_STREAM
1512+
1513+
f = RstStreamFrame(e.stream_id)
1514+
f.error_code = ErrorCodes.STREAM_CLOSED
1515+
self._prepare_for_sending([f])
1516+
events = []
1517+
else:
1518+
# Stream was closed by a local reset. A stream SHOULD NOT
1519+
# send additional RST_STREAM frames. Ignore.
1520+
events = []
1521+
pass
15131522
elif self._stream_is_closed_by_end(e.stream_id):
15141523
# Closed by END_STREAM is a connection error.
15151524
raise StreamClosedError(e.stream_id)
@@ -1655,13 +1664,32 @@ def _handle_data_on_closed_stream(self, events, exc, frame):
16551664
"auto-emitted a WINDOW_UPDATE by %d",
16561665
frame.stream_id, conn_increment
16571666
)
1658-
f = RstStreamFrame(exc.stream_id)
1659-
f.error_code = exc.error_code
1660-
frames.append(f)
1661-
self.config.logger.debug(
1662-
"Stream %d already CLOSED or cleaned up - "
1663-
"auto-emitted a RST_FRAME" % frame.stream_id
1664-
)
1667+
1668+
send_rst_frame = False
1669+
1670+
if frame.stream_id in self._closed_streams:
1671+
closed_by = self._closed_streams[frame.stream_id]
1672+
1673+
if closed_by == StreamClosedBy.RECV_RST_STREAM:
1674+
self._closed_streams[frame.stream_id] = StreamClosedBy.SEND_RST_STREAM
1675+
send_rst_frame = True
1676+
elif closed_by == StreamClosedBy.SEND_RST_STREAM:
1677+
# Do not send additional RST_STREAM frames
1678+
pass
1679+
else:
1680+
# Protocol error
1681+
raise StreamClosedError(frame.stream_id)
1682+
else:
1683+
send_rst_frame = True
1684+
1685+
if send_rst_frame:
1686+
f = RstStreamFrame(exc.stream_id)
1687+
f.error_code = exc.error_code
1688+
frames.append(f)
1689+
self.config.logger.debug(
1690+
"Stream %d already CLOSED or cleaned up - "
1691+
"auto-emitted a RST_FRAME" % frame.stream_id
1692+
)
16651693
return frames, events + exc._events
16661694

16671695
def _receive_data_frame(self, frame):
@@ -1677,6 +1705,8 @@ def _receive_data_frame(self, frame):
16771705
flow_controlled_length
16781706
)
16791707

1708+
stream = None
1709+
16801710
try:
16811711
stream = self._get_stream_by_id(frame.stream_id)
16821712
frames, stream_events = stream.receive_data(
@@ -1685,6 +1715,11 @@ def _receive_data_frame(self, frame):
16851715
flow_controlled_length
16861716
)
16871717
except StreamClosedError as e:
1718+
# If this exception originated from a yet-to-be clenaed up stream,
1719+
# check if it should be a connection error
1720+
if stream is not None and e._connection_error:
1721+
raise
1722+
16881723
# This stream is either marked as CLOSED or already gone from our
16891724
# internal state.
16901725
return self._handle_data_on_closed_stream(events, e, frame)
@@ -1962,7 +1997,7 @@ def _stream_closed_by(self, stream_id):
19621997
before opening this one.
19631998
"""
19641999
if stream_id in self.streams:
1965-
return self.streams[stream_id].closed_by
2000+
return self.streams[stream_id].closed_by # pragma: no cover
19662001
if stream_id in self._closed_streams:
19672002
return self._closed_streams[stream_id]
19682003
return None
@@ -1976,6 +2011,14 @@ def _stream_is_closed_by_reset(self, stream_id):
19762011
StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM
19772012
)
19782013

2014+
def _stream_is_closed_by_peer_reset(self, stream_id):
2015+
"""
2016+
Returns ``True`` if the stream was closed by sending or receiving a
2017+
RST_STREAM frame. Returns ``False`` otherwise.
2018+
"""
2019+
return (self._stream_closed_by(stream_id) ==
2020+
StreamClosedBy.RECV_RST_STREAM)
2021+
19792022
def _stream_is_closed_by_end(self, stream_id):
19802023
"""
19812024
Returns ``True`` if the stream was closed by sending or receiving an

src/h2/exceptions.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class StreamClosedError(NoSuchStreamError):
104104
that the stream has since been closed, and that all state relating to that
105105
stream has been removed.
106106
"""
107-
def __init__(self, stream_id):
107+
def __init__(self, stream_id, connection_error=True):
108108
#: The stream ID corresponds to the nonexistent stream.
109109
self.stream_id = stream_id
110110

@@ -115,6 +115,12 @@ def __init__(self, stream_id):
115115
# external users that may receive a StreamClosedError.
116116
self._events = []
117117

118+
# If this is a connection error or a stream error. This exception
119+
# is used to send a `RST_STREAM` frame on stream errors. If
120+
# connection_error is false, H2Connection will suppress this
121+
# exception after sending the reset frame.
122+
self._connection_error = connection_error
123+
118124

119125
class InvalidSettingsValueError(ProtocolError, ValueError):
120126
"""

src/h2/stream.py

+53-27
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ def reset_stream_on_error(self, previous_state):
316316
"""
317317
self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
318318

319-
error = StreamClosedError(self.stream_id)
319+
error = StreamClosedError(self.stream_id, connection_error=False)
320320

321321
event = StreamReset()
322322
event.stream_id = self.stream_id
@@ -334,8 +334,31 @@ def recv_on_closed_stream(self, previous_state):
334334
a stream error or connection error with type STREAM_CLOSED, depending
335335
on the specific frame. The error handling is done at a higher level:
336336
this just raises the appropriate error.
337-
"""
338-
raise StreamClosedError(self.stream_id)
337+
338+
RFC:
339+
Normally, an endpoint SHOULD NOT send more than one RST_STREAM
340+
frame for any stream. However, an endpoint MAY send additional
341+
RST_STREAM frames if it receives frames on a closed stream after
342+
more than a round-trip time. This behavior is permitted to deal
343+
with misbehaving implementations.
344+
345+
Implementation:
346+
Raising StreamClosedError causes the RST_STREAM frame to be sent.
347+
If the stream closed_by value is SEND_RST_STREAM, ignore this
348+
instead of raising, such that only one RST_STREAM frame is sent.
349+
There is currently now latency tracking, and as such measuring
350+
round-trip time for allowed additional RST_STREAM frames which
351+
MAY be sent cannot be implemented.
352+
"""
353+
354+
if self.stream_closed_by == StreamClosedBy.RECV_RST_STREAM:
355+
self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
356+
raise StreamClosedError(self.stream_id, connection_error=False)
357+
elif self.stream_closed_by in (StreamClosedBy.RECV_END_STREAM,
358+
StreamClosedBy.SEND_END_STREAM):
359+
raise StreamClosedError(self.stream_id)
360+
361+
return []
339362

340363
def send_on_closed_stream(self, previous_state):
341364
"""
@@ -1040,23 +1063,24 @@ def receive_headers(self, headers, end_stream, header_encoding):
10401063

10411064
events = self.state_machine.process_input(input_)
10421065

1043-
if end_stream:
1044-
es_events = self.state_machine.process_input(
1045-
StreamInputs.RECV_END_STREAM
1046-
)
1047-
events[0].stream_ended = es_events[0]
1048-
events += es_events
1066+
if len(events) > 0:
1067+
if end_stream:
1068+
es_events = self.state_machine.process_input(
1069+
StreamInputs.RECV_END_STREAM
1070+
)
1071+
events[0].stream_ended = es_events[0]
1072+
events += es_events
10491073

1050-
self._initialize_content_length(headers)
1074+
self._initialize_content_length(headers)
10511075

1052-
if isinstance(events[0], TrailersReceived):
1053-
if not end_stream:
1054-
raise ProtocolError("Trailers must have END_STREAM set")
1076+
if isinstance(events[0], TrailersReceived):
1077+
if not end_stream:
1078+
raise ProtocolError("Trailers must have END_STREAM set")
10551079

1056-
hdr_validation_flags = self._build_hdr_validation_flags(events)
1057-
events[0].headers = self._process_received_headers(
1058-
headers, hdr_validation_flags, header_encoding
1059-
)
1080+
hdr_validation_flags = self._build_hdr_validation_flags(events)
1081+
events[0].headers = self._process_received_headers(
1082+
headers, hdr_validation_flags, header_encoding
1083+
)
10601084
return [], events
10611085

10621086
def receive_data(self, data, end_stream, flow_control_len):
@@ -1068,18 +1092,20 @@ def receive_data(self, data, end_stream, flow_control_len):
10681092
"set to %d", self, end_stream, flow_control_len
10691093
)
10701094
events = self.state_machine.process_input(StreamInputs.RECV_DATA)
1071-
self._inbound_window_manager.window_consumed(flow_control_len)
1072-
self._track_content_length(len(data), end_stream)
10731095

1074-
if end_stream:
1075-
es_events = self.state_machine.process_input(
1076-
StreamInputs.RECV_END_STREAM
1077-
)
1078-
events[0].stream_ended = es_events[0]
1079-
events.extend(es_events)
1096+
if len(events) > 0:
1097+
self._inbound_window_manager.window_consumed(flow_control_len)
1098+
self._track_content_length(len(data), end_stream)
1099+
1100+
if end_stream:
1101+
es_events = self.state_machine.process_input(
1102+
StreamInputs.RECV_END_STREAM
1103+
)
1104+
events[0].stream_ended = es_events[0]
1105+
events.extend(es_events)
10801106

1081-
events[0].data = data
1082-
events[0].flow_controlled_length = flow_control_len
1107+
events[0].data = data
1108+
events[0].flow_controlled_length = flow_control_len
10831109
return [], events
10841110

10851111
def receive_window_update(self, increment):

0 commit comments

Comments
 (0)