diff --git a/go.mod b/go.mod index 8ab3f40e1..3bd487f5a 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module golang.org/x/net go 1.18 require ( - golang.org/x/crypto v0.16.0 - golang.org/x/sys v0.15.0 - golang.org/x/term v0.15.0 + golang.org/x/crypto v0.18.0 + golang.org/x/sys v0.16.0 + golang.org/x/term v0.16.0 golang.org/x/text v0.14.0 ) diff --git a/go.sum b/go.sum index bb6ed68a0..8eeaf16c6 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ -golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= -golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= diff --git a/http2/server_test.go b/http2/server_test.go index 22657cbfe..1fdd191ef 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -145,6 +145,12 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{} ConfigureServer(ts.Config, h2server) + // Go 1.22 changes the default minimum TLS version to TLS 1.2, + // in order to properly test cases where we want to reject low + // TLS versions, we need to explicitly configure the minimum + // version here. + ts.Config.TLSConfig.MinVersion = tls.VersionTLS10 + st := &serverTester{ t: t, ts: ts, diff --git a/internal/quic/cmd/interop/Dockerfile b/internal/quic/cmd/interop/Dockerfile index 4b52e5356..b60999a86 100644 --- a/internal/quic/cmd/interop/Dockerfile +++ b/internal/quic/cmd/interop/Dockerfile @@ -9,7 +9,7 @@ ENV GOVERSION=1.21.1 RUN platform=$(echo ${TARGETPLATFORM} | tr '/' '-') && \ filename="go${GOVERSION}.${platform}.tar.gz" && \ - wget https://dl.google.com/go/${filename} && \ + wget --no-verbose https://dl.google.com/go/${filename} && \ tar xfz ${filename} && \ rm ${filename} diff --git a/internal/quic/conn.go b/internal/quic/conn.go index 31e789b1d..6d79013eb 100644 --- a/internal/quic/conn.go +++ b/internal/quic/conn.go @@ -263,10 +263,7 @@ var errIdleTimeout = errors.New("idle timeout") // The loop processes messages from c.msgc and timer events. // Other goroutines may examine or modify conn state by sending the loop funcs to execute. func (c *Conn) loop(now time.Time) { - defer close(c.donec) - defer c.tls.Close() - defer c.endpoint.connDrained(c) - defer c.logConnectionClosed() + defer c.cleanup() // The connection timer sends a message to the connection loop on expiry. // We need to give it an expiry when creating it, so set the initial timeout to @@ -346,6 +343,13 @@ func (c *Conn) loop(now time.Time) { } } +func (c *Conn) cleanup() { + c.logConnectionClosed() + c.endpoint.connDrained(c) + c.tls.Close() + close(c.donec) +} + // sendMsg sends a message to the conn's loop. // It does not wait for the message to be processed. // The conn may close before processing the message, in which case it is lost. @@ -365,12 +369,37 @@ func (c *Conn) wake() { } // runOnLoop executes a function within the conn's loop goroutine. -func (c *Conn) runOnLoop(f func(now time.Time, c *Conn)) error { +func (c *Conn) runOnLoop(ctx context.Context, f func(now time.Time, c *Conn)) error { donec := make(chan struct{}) - c.sendMsg(func(now time.Time, c *Conn) { + msg := func(now time.Time, c *Conn) { defer close(donec) f(now, c) - }) + } + if c.testHooks != nil { + // In tests, we can't rely on being able to send a message immediately: + // c.msgc might be full, and testConnHooks.nextMessage might be waiting + // for us to block before it processes the next message. + // To avoid a deadlock, we send the message in waitUntil. + // If msgc is empty, the message is buffered. + // If msgc is full, we block and let nextMessage process the queue. + msgc := c.msgc + c.testHooks.waitUntil(ctx, func() bool { + for { + select { + case msgc <- msg: + msgc = nil // send msg only once + case <-donec: + return true + case <-c.donec: + return true + default: + return false + } + } + }) + } else { + c.sendMsg(msg) + } select { case <-donec: case <-c.donec: diff --git a/internal/quic/conn_async_test.go b/internal/quic/conn_async_test.go index dc2a57f9d..4671f8340 100644 --- a/internal/quic/conn_async_test.go +++ b/internal/quic/conn_async_test.go @@ -41,7 +41,7 @@ type asyncOp[T any] struct { err error caller string - state *asyncTestState + tc *testConn donec chan struct{} cancelFunc context.CancelFunc } @@ -55,7 +55,7 @@ func (a *asyncOp[T]) cancel() { default: } a.cancelFunc() - <-a.state.notify + <-a.tc.asyncTestState.notify select { case <-a.donec: default: @@ -73,6 +73,7 @@ var errNotDone = errors.New("async op is not done") // control over the progress of operations, an asyncOp can only // become done in reaction to the test taking some action. func (a *asyncOp[T]) result() (v T, err error) { + a.tc.wait() select { case <-a.donec: return a.v, a.err @@ -94,8 +95,8 @@ type asyncContextKey struct{} // The function f should call a blocking function such as // Stream.Write or Conn.AcceptStream and return its result. // It must use the provided context. -func runAsync[T any](ts *testConn, f func(context.Context) (T, error)) *asyncOp[T] { - as := &ts.asyncTestState +func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] { + as := &tc.asyncTestState if as.notify == nil { as.notify = make(chan struct{}) as.mu.Lock() @@ -106,7 +107,7 @@ func runAsync[T any](ts *testConn, f func(context.Context) (T, error)) *asyncOp[ ctx := context.WithValue(context.Background(), asyncContextKey{}, true) ctx, cancel := context.WithCancel(ctx) a := &asyncOp[T]{ - state: as, + tc: tc, caller: fmt.Sprintf("%v:%v", filepath.Base(file), line), donec: make(chan struct{}), cancelFunc: cancel, @@ -116,14 +117,15 @@ func runAsync[T any](ts *testConn, f func(context.Context) (T, error)) *asyncOp[ close(a.donec) as.notify <- struct{}{} }() - ts.t.Cleanup(func() { + tc.t.Cleanup(func() { if _, err := a.result(); err == errNotDone { - ts.t.Errorf("%v: async operation is still executing at end of test", a.caller) + tc.t.Errorf("%v: async operation is still executing at end of test", a.caller) a.cancel() } }) // Wait for the operation to either finish or block. <-as.notify + tc.wait() return a } diff --git a/internal/quic/conn_close.go b/internal/quic/conn_close.go index 246a12638..1798d0536 100644 --- a/internal/quic/conn_close.go +++ b/internal/quic/conn_close.go @@ -71,7 +71,10 @@ func (c *Conn) lifetimeInit() { c.lifetime.donec = make(chan struct{}) } -var errNoPeerResponse = errors.New("peer did not respond to CONNECTION_CLOSE") +var ( + errNoPeerResponse = errors.New("peer did not respond to CONNECTION_CLOSE") + errConnClosed = errors.New("connection closed") +) // advance is called when time passes. func (c *Conn) lifetimeAdvance(now time.Time) (done bool) { @@ -91,13 +94,21 @@ func (c *Conn) lifetimeAdvance(now time.Time) (done bool) { // setState sets the conn state. func (c *Conn) setState(now time.Time, state connState) { + if c.lifetime.state == state { + return + } + c.lifetime.state = state switch state { case connStateClosing, connStateDraining: if c.lifetime.drainEndTime.IsZero() { c.lifetime.drainEndTime = now.Add(3 * c.loss.ptoBasePeriod()) } + case connStateDone: + c.setFinalError(nil) + } + if state != connStateAlive { + c.streamsCleanup() } - c.lifetime.state = state } // confirmHandshake is called when the TLS handshake completes. diff --git a/internal/quic/conn_close_test.go b/internal/quic/conn_close_test.go index 49881e62f..63d4911e8 100644 --- a/internal/quic/conn_close_test.go +++ b/internal/quic/conn_close_test.go @@ -216,3 +216,64 @@ func TestConnCloseClosedByEndpoint(t *testing.T) { code: errNo, }) } + +func testConnCloseUnblocks(t *testing.T, f func(context.Context, *testConn) error, opts ...any) { + tc := newTestConn(t, clientSide, opts...) + tc.handshake() + op := runAsync(tc, func(ctx context.Context) (struct{}, error) { + return struct{}{}, f(ctx, tc) + }) + if _, err := op.result(); err != errNotDone { + t.Fatalf("before abort, op = %v, want errNotDone", err) + } + tc.conn.Abort(nil) + if _, err := op.result(); err == nil || err == errNotDone { + t.Fatalf("after abort, op = %v, want error", err) + } +} + +func TestConnCloseUnblocksAcceptStream(t *testing.T) { + testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error { + _, err := tc.conn.AcceptStream(ctx) + return err + }, permissiveTransportParameters) +} + +func TestConnCloseUnblocksNewStream(t *testing.T) { + testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error { + _, err := tc.conn.NewStream(ctx) + return err + }) +} + +func TestConnCloseUnblocksStreamRead(t *testing.T) { + testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error { + s := newLocalStream(t, tc, bidiStream) + buf := make([]byte, 16) + _, err := s.ReadContext(ctx, buf) + return err + }, permissiveTransportParameters) +} + +func TestConnCloseUnblocksStreamWrite(t *testing.T) { + testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error { + s := newLocalStream(t, tc, bidiStream) + buf := make([]byte, 32) + _, err := s.WriteContext(ctx, buf) + return err + }, permissiveTransportParameters, func(c *Config) { + c.MaxStreamWriteBufferSize = 16 + }) +} + +func TestConnCloseUnblocksStreamClose(t *testing.T) { + testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error { + s := newLocalStream(t, tc, bidiStream) + buf := make([]byte, 16) + _, err := s.WriteContext(ctx, buf) + if err != nil { + return err + } + return s.CloseContext(ctx) + }, permissiveTransportParameters) +} diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go index 156ef5dd5..045bf861c 100644 --- a/internal/quic/conn_recv.go +++ b/internal/quic/conn_recv.go @@ -101,6 +101,9 @@ func (c *Conn) handleLongHeader(now time.Time, ptype packetType, space numberSpa if logPackets { logInboundLongPacket(c, p) } + if c.logEnabled(QLogLevelPacket) { + c.logLongPacketReceived(p, buf[:n]) + } c.connIDState.handlePacket(c, p.ptype, p.srcConnID) ackEliciting := c.handleFrames(now, ptype, space, p.payload) c.acks[space].receive(now, space, p.num, ackEliciting) @@ -149,6 +152,9 @@ func (c *Conn) handle1RTT(now time.Time, buf []byte) int { if logPackets { logInboundShortPacket(c, p) } + if c.logEnabled(QLogLevelPacket) { + c.log1RTTPacketReceived(p, buf) + } ackEliciting := c.handleFrames(now, packetType1RTT, appDataSpace, p.payload) c.acks[appDataSpace].receive(now, appDataSpace, p.num, ackEliciting) return len(buf) diff --git a/internal/quic/conn_send.go b/internal/quic/conn_send.go index 4065474d2..ccb467591 100644 --- a/internal/quic/conn_send.go +++ b/internal/quic/conn_send.go @@ -60,7 +60,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { pad := false var sentInitial *sentPacket if c.keysInitial.canWrite() { - pnumMaxAcked := c.acks[initialSpace].largestSeen() + pnumMaxAcked := c.loss.spaces[initialSpace].maxAcked pnum := c.loss.nextNumber(initialSpace) p := longPacket{ ptype: packetTypeInitial, @@ -75,6 +75,9 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { if logPackets { logSentPacket(c, packetTypeInitial, pnum, p.srcConnID, p.dstConnID, c.w.payload()) } + if c.logEnabled(QLogLevelPacket) && len(c.w.payload()) > 0 { + c.logPacketSent(packetTypeInitial, pnum, p.srcConnID, p.dstConnID, c.w.packetLen(), c.w.payload()) + } sentInitial = c.w.finishProtectedLongHeaderPacket(pnumMaxAcked, c.keysInitial.w, p) if sentInitial != nil { c.idleHandlePacketSent(now, sentInitial) @@ -90,7 +93,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { // Handshake packet. if c.keysHandshake.canWrite() { - pnumMaxAcked := c.acks[handshakeSpace].largestSeen() + pnumMaxAcked := c.loss.spaces[handshakeSpace].maxAcked pnum := c.loss.nextNumber(handshakeSpace) p := longPacket{ ptype: packetTypeHandshake, @@ -104,6 +107,9 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { if logPackets { logSentPacket(c, packetTypeHandshake, pnum, p.srcConnID, p.dstConnID, c.w.payload()) } + if c.logEnabled(QLogLevelPacket) && len(c.w.payload()) > 0 { + c.logPacketSent(packetTypeHandshake, pnum, p.srcConnID, p.dstConnID, c.w.packetLen(), c.w.payload()) + } if sent := c.w.finishProtectedLongHeaderPacket(pnumMaxAcked, c.keysHandshake.w, p); sent != nil { c.idleHandlePacketSent(now, sent) c.loss.packetSent(now, handshakeSpace, sent) @@ -118,7 +124,7 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { // 1-RTT packet. if c.keysAppData.canWrite() { - pnumMaxAcked := c.acks[appDataSpace].largestSeen() + pnumMaxAcked := c.loss.spaces[appDataSpace].maxAcked pnum := c.loss.nextNumber(appDataSpace) c.w.start1RTTPacket(pnum, pnumMaxAcked, dstConnID) c.appendFrames(now, appDataSpace, pnum, limit) @@ -132,6 +138,9 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) { if logPackets { logSentPacket(c, packetType1RTT, pnum, nil, dstConnID, c.w.payload()) } + if c.logEnabled(QLogLevelPacket) && len(c.w.payload()) > 0 { + c.logPacketSent(packetType1RTT, pnum, nil, dstConnID, c.w.packetLen(), c.w.payload()) + } if sent := c.w.finish1RTTPacket(pnum, pnumMaxAcked, dstConnID, &c.keysAppData); sent != nil { c.idleHandlePacketSent(now, sent) c.loss.packetSent(now, appDataSpace, sent) @@ -213,11 +222,7 @@ func (c *Conn) appendFrames(now time.Time, space numberSpace, pnum packetNumber, // Either we are willing to send an ACK-only packet, // or we've added additional frames. c.acks[space].sentAck() - if !c.w.sent.ackEliciting && c.keysAppData.needAckEliciting() { - // The peer has initiated a key update. - // We haven't sent them any packets yet in the new phase. - // Make this an ack-eliciting packet. - // Their ack of this packet will complete the key update. + if !c.w.sent.ackEliciting && c.shouldMakePacketAckEliciting() { c.w.appendPingFrame() } }() @@ -322,6 +327,30 @@ func (c *Conn) appendFrames(now time.Time, space numberSpace, pnum packetNumber, } } +// shouldMakePacketAckEliciting is called when sending a packet containing nothing but an ACK frame. +// It reports whether we should add a PING frame to the packet to make it ack-eliciting. +func (c *Conn) shouldMakePacketAckEliciting() bool { + if c.keysAppData.needAckEliciting() { + // The peer has initiated a key update. + // We haven't sent them any packets yet in the new phase. + // Make this an ack-eliciting packet. + // Their ack of this packet will complete the key update. + return true + } + if c.loss.consecutiveNonAckElicitingPackets >= 19 { + // We've sent a run of non-ack-eliciting packets. + // Add in an ack-eliciting one every once in a while so the peer + // lets us know which ones have arrived. + // + // Google QUICHE injects a PING after sending 19 packets. We do the same. + // + // https://www.rfc-editor.org/rfc/rfc9000#section-13.2.4-2 + return true + } + // TODO: Consider making every packet sent when in PTO ack-eliciting to speed up recovery. + return false +} + func (c *Conn) appendAckFrame(now time.Time, space numberSpace) bool { seen, delay := c.acks[space].acksToSend(now) if len(seen) == 0 { diff --git a/internal/quic/conn_send_test.go b/internal/quic/conn_send_test.go new file mode 100644 index 000000000..2205ff2f7 --- /dev/null +++ b/internal/quic/conn_send_test.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "testing" + "time" +) + +func TestAckElicitingAck(t *testing.T) { + // "A receiver that sends only non-ack-eliciting packets [...] might not receive + // an acknowledgment for a long period of time. + // [...] a receiver could send a [...] ack-eliciting frame occasionally [...] + // to elicit an ACK from the peer." + // https://www.rfc-editor.org/rfc/rfc9000#section-13.2.4-2 + // + // Send a bunch of ack-eliciting packets, verify that the conn doesn't just + // send ACKs in response. + tc := newTestConn(t, clientSide, permissiveTransportParameters) + tc.handshake() + const count = 100 + for i := 0; i < count; i++ { + tc.advance(1 * time.Millisecond) + tc.writeFrames(packetType1RTT, + debugFramePing{}, + ) + got, _ := tc.readFrame() + switch got.(type) { + case debugFrameAck: + continue + case debugFramePing: + return + } + } + t.Errorf("after sending %v PINGs, got no ack-eliciting response", count) +} + +func TestSendPacketNumberSize(t *testing.T) { + tc := newTestConn(t, clientSide, permissiveTransportParameters) + tc.handshake() + + recvPing := func() *testPacket { + t.Helper() + tc.conn.ping(appDataSpace) + p := tc.readPacket() + if p == nil { + t.Fatalf("want packet containing PING, got none") + } + return p + } + + // Desynchronize the packet numbers the conn is sending and the ones it is receiving, + // by having the conn send a number of unacked packets. + for i := 0; i < 16; i++ { + recvPing() + } + + // Establish the maximum packet number the conn has received an ACK for. + maxAcked := recvPing().num + tc.writeAckForAll() + + // Make the conn send a sequence of packets. + // Check that the packet number is encoded with two bytes once the difference between the + // current packet and the max acked one is sufficiently large. + for want := maxAcked + 1; want < maxAcked+0x100; want++ { + p := recvPing() + if p.num != want { + t.Fatalf("received packet number %v, want %v", p.num, want) + } + gotPnumLen := int(p.header&0x03) + 1 + wantPnumLen := 1 + if p.num-maxAcked >= 0x80 { + wantPnumLen = 2 + } + if gotPnumLen != wantPnumLen { + t.Fatalf("packet number 0x%x encoded with %v bytes, want %v (max acked = %v)", p.num, gotPnumLen, wantPnumLen, maxAcked) + } + } +} diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go index 83ab5554c..87cfd297e 100644 --- a/internal/quic/conn_streams.go +++ b/internal/quic/conn_streams.go @@ -16,8 +16,14 @@ import ( type streamsState struct { queue queue[*Stream] // new, peer-created streams - streamsMu sync.Mutex - streams map[streamID]*Stream + // All peer-created streams. + // + // Implicitly created streams are included as an empty entry in the map. + // (For example, if we receive a frame for stream 4, we implicitly create stream 0 and + // insert an empty entry for it to the map.) + // + // The map value is maybeStream rather than *Stream as a reminder that values can be nil. + streams map[streamID]maybeStream // Limits on the number of streams, indexed by streamType. localLimit [streamTypeCount]localStreamLimits @@ -39,8 +45,13 @@ type streamsState struct { queueData streamRing // streams with only flow-controlled frames } +// maybeStream is a possibly nil *Stream. See streamsState.streams. +type maybeStream struct { + s *Stream +} + func (c *Conn) streamsInit() { - c.streams.streams = make(map[streamID]*Stream) + c.streams.streams = make(map[streamID]maybeStream) c.streams.queue = newQueue[*Stream]() c.streams.localLimit[bidiStream].init() c.streams.localLimit[uniStream].init() @@ -49,6 +60,17 @@ func (c *Conn) streamsInit() { c.inflowInit() } +func (c *Conn) streamsCleanup() { + c.streams.queue.close(errConnClosed) + c.streams.localLimit[bidiStream].connHasClosed() + c.streams.localLimit[uniStream].connHasClosed() + for _, s := range c.streams.streams { + if s.s != nil { + s.s.connHasClosed() + } + } +} + // AcceptStream waits for and returns the next stream created by the peer. func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) { return c.streams.queue.get(ctx, c.testHooks) @@ -71,9 +93,6 @@ func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) { } func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) { - c.streams.streamsMu.Lock() - defer c.streams.streamsMu.Unlock() - num, err := c.streams.localLimit[styp].open(ctx, c) if err != nil { return nil, err @@ -89,7 +108,12 @@ func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, er s.inUnlock() s.outUnlock() - c.streams.streams[s.id] = s + // Modify c.streams on the conn's loop. + if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) { + c.streams.streams[s.id] = maybeStream{s} + }); err != nil { + return nil, err + } return s, nil } @@ -108,9 +132,7 @@ const ( // streamForID returns the stream with the given id. // If the stream does not exist, it returns nil. func (c *Conn) streamForID(id streamID) *Stream { - c.streams.streamsMu.Lock() - defer c.streams.streamsMu.Unlock() - return c.streams.streams[id] + return c.streams.streams[id].s } // streamForFrame returns the stream with the given id. @@ -135,11 +157,9 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) } } - c.streams.streamsMu.Lock() - defer c.streams.streamsMu.Unlock() - s, isOpen := c.streams.streams[id] - if s != nil { - return s + ms, isOpen := c.streams.streams[id] + if ms.s != nil { + return ms.s } num := id.num() @@ -176,10 +196,10 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) // with the same initiator and type and a lower number. // Add a nil entry to the streams map for each implicitly created stream. for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 { - c.streams.streams[n] = nil + c.streams.streams[n] = maybeStream{} } - s = newStream(c, id) + s := newStream(c, id) s.inmaxbuf = c.config.maxStreamReadBufferSize() s.inwin = c.config.maxStreamReadBufferSize() if id.streamType() == bidiStream { @@ -189,7 +209,7 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) s.inUnlock() s.outUnlock() - c.streams.streams[id] = s + c.streams.streams[id] = maybeStream{s} c.streams.queue.put(s) return s } @@ -393,7 +413,11 @@ func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool { c.streams.sendMu.Lock() defer c.streams.sendMu.Unlock() const pto = true - for _, s := range c.streams.streams { + for _, ms := range c.streams.streams { + s := ms.s + if s == nil { + continue + } const pto = true s.ingate.lock() inOK := s.appendInFramesLocked(w, pnum, pto) diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go index c90354db8..6815e403e 100644 --- a/internal/quic/conn_streams_test.go +++ b/internal/quic/conn_streams_test.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "math" + "sync" "testing" ) @@ -478,3 +479,85 @@ func TestStreamsCreateAndCloseRemote(t *testing.T) { t.Fatalf("after test, stream send queue is not empty; should be") } } + +func TestStreamsCreateConcurrency(t *testing.T) { + cli, srv := newLocalConnPair(t, &Config{}, &Config{}) + + srvdone := make(chan int) + go func() { + defer close(srvdone) + for streams := 0; ; streams++ { + s, err := srv.AcceptStream(context.Background()) + if err != nil { + srvdone <- streams + return + } + s.Close() + } + }() + + var wg sync.WaitGroup + const concurrency = 10 + const streams = 10 + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < streams; j++ { + s, err := cli.NewStream(context.Background()) + if err != nil { + t.Errorf("NewStream: %v", err) + return + } + s.Flush() + _, err = io.ReadAll(s) + if err != nil { + t.Errorf("ReadFull: %v", err) + } + s.Close() + } + }() + } + wg.Wait() + + cli.Abort(nil) + srv.Abort(nil) + if got, want := <-srvdone, concurrency*streams; got != want { + t.Errorf("accepted %v streams, want %v", got, want) + } +} + +func TestStreamsPTOWithImplicitStream(t *testing.T) { + ctx := canceledContext() + tc := newTestConn(t, serverSide, permissiveTransportParameters) + tc.handshake() + tc.ignoreFrame(frameTypeAck) + + // Peer creates stream 1, and implicitly creates stream 0. + tc.writeFrames(packetType1RTT, debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 1), + }) + + // We accept stream 1 and write data to it. + data := []byte("data") + s, err := tc.conn.AcceptStream(ctx) + if err != nil { + t.Fatalf("conn.AcceptStream() = %v, want stream", err) + } + s.Write(data) + s.Flush() + tc.wantFrame("data written to stream", + packetType1RTT, debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 1), + data: data, + }) + + // PTO expires, and the data is resent. + const pto = true + tc.triggerLossOrPTO(packetType1RTT, true) + tc.wantFrame("data resent after PTO expires", + packetType1RTT, debugFrameStream{ + id: newStreamID(clientSide, bidiStream, 1), + data: data, + }) +} diff --git a/internal/quic/conn_test.go b/internal/quic/conn_test.go index c57ba1487..ddf0740e2 100644 --- a/internal/quic/conn_test.go +++ b/internal/quic/conn_test.go @@ -13,15 +13,21 @@ import ( "errors" "flag" "fmt" + "log/slog" "math" "net/netip" "reflect" "strings" "testing" "time" + + "golang.org/x/net/internal/quic/qlog" ) -var testVV = flag.Bool("vv", false, "even more verbose test output") +var ( + testVV = flag.Bool("vv", false, "even more verbose test output") + qlogdir = flag.String("qlog", "", "write qlog logs to directory") +) func TestConnTestConn(t *testing.T) { tc := newTestConn(t, serverSide) @@ -30,10 +36,12 @@ func TestConnTestConn(t *testing.T) { t.Errorf("new conn timeout=%v, want %v (max_idle_timeout)", got, want) } - var ranAt time.Time - tc.conn.runOnLoop(func(now time.Time, c *Conn) { - ranAt = now - }) + ranAt, _ := runAsync(tc, func(ctx context.Context) (when time.Time, _ error) { + tc.conn.runOnLoop(ctx, func(now time.Time, c *Conn) { + when = now + }) + return + }).result() if !ranAt.Equal(tc.endpoint.now) { t.Errorf("func ran on loop at %v, want %v", ranAt, tc.endpoint.now) } @@ -41,9 +49,12 @@ func TestConnTestConn(t *testing.T) { nextTime := tc.endpoint.now.Add(defaultMaxIdleTimeout / 2) tc.advanceTo(nextTime) - tc.conn.runOnLoop(func(now time.Time, c *Conn) { - ranAt = now - }) + ranAt, _ = runAsync(tc, func(ctx context.Context) (when time.Time, _ error) { + tc.conn.runOnLoop(ctx, func(now time.Time, c *Conn) { + when = now + }) + return + }).result() if !ranAt.Equal(nextTime) { t.Errorf("func ran on loop at %v, want %v", ranAt, nextTime) } @@ -77,6 +88,7 @@ func (d testDatagram) String() string { type testPacket struct { ptype packetType + header byte version uint32 num packetNumber keyPhaseBit bool @@ -193,6 +205,10 @@ func newTestConn(t *testing.T, side connSide, opts ...any) *testConn { config := &Config{ TLSConfig: newTestTLSConfig(side), StatelessResetKey: testStatelessResetKey, + QLogLogger: slog.New(qlog.NewJSONHandler(qlog.HandlerOptions{ + Level: QLogLevelFrame, + Dir: *qlogdir, + })), } var cids newServerConnIDs if side == serverSide { @@ -279,6 +295,9 @@ func newTestConnForConn(t *testing.T, endpoint *testEndpoint, conn *Conn) *testC } tc.peerTLSConn.SetTransportParameters(marshalTransportParameters(peerProvidedParams)) tc.peerTLSConn.Start(context.Background()) + t.Cleanup(func() { + tc.peerTLSConn.Close() + }) return tc } @@ -591,12 +610,18 @@ func (tc *testConn) readFrame() (debugFrame, packetType) { func (tc *testConn) wantDatagram(expectation string, want *testDatagram) { tc.t.Helper() got := tc.readDatagram() - if !reflect.DeepEqual(got, want) { + if !datagramEqual(got, want) { tc.t.Fatalf("%v:\ngot datagram: %v\nwant datagram: %v", expectation, got, want) } } func datagramEqual(a, b *testDatagram) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } if a.paddedSize != b.paddedSize || a.addr != b.addr || len(a.packets) != len(b.packets) { @@ -614,7 +639,7 @@ func datagramEqual(a, b *testDatagram) bool { func (tc *testConn) wantPacket(expectation string, want *testPacket) { tc.t.Helper() got := tc.readPacket() - if !reflect.DeepEqual(got, want) { + if !packetEqual(got, want) { tc.t.Fatalf("%v:\ngot packet: %v\nwant packet: %v", expectation, got, want) } } @@ -622,8 +647,10 @@ func (tc *testConn) wantPacket(expectation string, want *testPacket) { func packetEqual(a, b *testPacket) bool { ac := *a ac.frames = nil + ac.header = 0 bc := *b bc.frames = nil + bc.header = 0 if !reflect.DeepEqual(ac, bc) { return false } @@ -831,6 +858,7 @@ func parseTestDatagram(t *testing.T, te *testEndpoint, tc *testConn, buf []byte) } d.packets = append(d.packets, &testPacket{ ptype: p.ptype, + header: buf[0], version: p.version, num: p.num, dstConnID: p.dstConnID, @@ -872,6 +900,7 @@ func parseTestDatagram(t *testing.T, te *testEndpoint, tc *testConn, buf []byte) } d.packets = append(d.packets, &testPacket{ ptype: packetType1RTT, + header: hdr[0], num: pnum, dstConnID: hdr[1:][:len(tc.peerConnID)], keyPhaseBit: hdr[0]&keyPhaseBit != 0, diff --git a/internal/quic/endpoint_test.go b/internal/quic/endpoint_test.go index f9fc80152..ab6cd1cf5 100644 --- a/internal/quic/endpoint_test.go +++ b/internal/quic/endpoint_test.go @@ -11,11 +11,13 @@ import ( "context" "crypto/tls" "io" + "log/slog" "net" "net/netip" - "reflect" "testing" "time" + + "golang.org/x/net/internal/quic/qlog" ) func TestConnect(t *testing.T) { @@ -48,7 +50,7 @@ func TestStreamTransfer(t *testing.T) { } }() - s, err := cli.NewStream(ctx) + s, err := cli.NewSendOnlyStream(ctx) if err != nil { t.Fatalf("NewStream: %v", err) } @@ -84,6 +86,12 @@ func newLocalEndpoint(t *testing.T, side connSide, conf *Config) *Endpoint { conf = &newConf conf.TLSConfig = newTestTLSConfig(side) } + if conf.QLogLogger == nil { + conf.QLogLogger = slog.New(qlog.NewJSONHandler(qlog.HandlerOptions{ + Level: QLogLevelFrame, + Dir: *qlogdir, + })) + } e, err := Listen("udp", "127.0.0.1:0", conf) if err != nil { t.Fatal(err) @@ -242,7 +250,7 @@ func (te *testEndpoint) readDatagram() *testDatagram { func (te *testEndpoint) wantDatagram(expectation string, want *testDatagram) { te.t.Helper() got := te.readDatagram() - if !reflect.DeepEqual(got, want) { + if !datagramEqual(got, want) { te.t.Fatalf("%v:\ngot datagram: %v\nwant datagram: %v", expectation, got, want) } } diff --git a/internal/quic/frame_debug.go b/internal/quic/frame_debug.go index dc8009037..0902c385f 100644 --- a/internal/quic/frame_debug.go +++ b/internal/quic/frame_debug.go @@ -8,6 +8,9 @@ package quic import ( "fmt" + "log/slog" + "strconv" + "time" ) // A debugFrame is a representation of the contents of a QUIC frame, @@ -15,6 +18,7 @@ import ( type debugFrame interface { String() string write(w *packetWriter) bool + LogValue() slog.Value } func parseDebugFrame(b []byte) (f debugFrame, n int) { @@ -97,6 +101,13 @@ func (f debugFramePadding) write(w *packetWriter) bool { return true } +func (f debugFramePadding) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "padding"), + slog.Int("length", f.size), + ) +} + // debugFramePing is a PING frame. type debugFramePing struct{} @@ -112,6 +123,12 @@ func (f debugFramePing) write(w *packetWriter) bool { return w.appendPingFrame() } +func (f debugFramePing) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "ping"), + ) +} + // debugFrameAck is an ACK frame. type debugFrameAck struct { ackDelay unscaledAckDelay @@ -126,7 +143,7 @@ func parseDebugFrameAck(b []byte) (f debugFrameAck, n int) { end: end, }) }) - // Ranges are parsed smallest to highest; reverse ranges slice to order them high to low. + // Ranges are parsed high to low; reverse ranges slice to order them low to high. for i := 0; i < len(f.ranges)/2; i++ { j := len(f.ranges) - 1 f.ranges[i], f.ranges[j] = f.ranges[j], f.ranges[i] @@ -146,6 +163,61 @@ func (f debugFrameAck) write(w *packetWriter) bool { return w.appendAckFrame(rangeset[packetNumber](f.ranges), f.ackDelay) } +func (f debugFrameAck) LogValue() slog.Value { + return slog.StringValue("error: debugFrameAck should not appear as a slog Value") +} + +// debugFrameScaledAck is an ACK frame with scaled ACK Delay. +// +// This type is used in qlog events, which need access to the delay as a duration. +type debugFrameScaledAck struct { + ackDelay time.Duration + ranges []i64range[packetNumber] +} + +func (f debugFrameScaledAck) LogValue() slog.Value { + var ackDelay slog.Attr + if f.ackDelay >= 0 { + ackDelay = slog.Duration("ack_delay", f.ackDelay) + } + return slog.GroupValue( + slog.String("frame_type", "ack"), + // Rather than trying to convert the ack ranges into the slog data model, + // pass a value that can JSON-encode itself. + slog.Any("acked_ranges", debugAckRanges(f.ranges)), + ackDelay, + ) +} + +type debugAckRanges []i64range[packetNumber] + +// AppendJSON appends a JSON encoding of the ack ranges to b, and returns it. +// This is different than the standard json.Marshaler, but more efficient. +// Since we only use this in cooperation with the qlog package, +// encoding/json compatibility is irrelevant. +func (r debugAckRanges) AppendJSON(b []byte) []byte { + b = append(b, '[') + for i, ar := range r { + start, end := ar.start, ar.end-1 // qlog ranges are closed-closed + if i != 0 { + b = append(b, ',') + } + b = append(b, '[') + b = strconv.AppendInt(b, int64(start), 10) + if start != end { + b = append(b, ',') + b = strconv.AppendInt(b, int64(end), 10) + } + b = append(b, ']') + } + b = append(b, ']') + return b +} + +func (r debugAckRanges) String() string { + return string(r.AppendJSON(nil)) +} + // debugFrameResetStream is a RESET_STREAM frame. type debugFrameResetStream struct { id streamID @@ -166,6 +238,14 @@ func (f debugFrameResetStream) write(w *packetWriter) bool { return w.appendResetStreamFrame(f.id, f.code, f.finalSize) } +func (f debugFrameResetStream) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "reset_stream"), + slog.Uint64("stream_id", uint64(f.id)), + slog.Uint64("final_size", uint64(f.finalSize)), + ) +} + // debugFrameStopSending is a STOP_SENDING frame. type debugFrameStopSending struct { id streamID @@ -185,6 +265,14 @@ func (f debugFrameStopSending) write(w *packetWriter) bool { return w.appendStopSendingFrame(f.id, f.code) } +func (f debugFrameStopSending) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "stop_sending"), + slog.Uint64("stream_id", uint64(f.id)), + slog.Uint64("error_code", uint64(f.code)), + ) +} + // debugFrameCrypto is a CRYPTO frame. type debugFrameCrypto struct { off int64 @@ -206,6 +294,14 @@ func (f debugFrameCrypto) write(w *packetWriter) bool { return added } +func (f debugFrameCrypto) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "crypto"), + slog.Int64("offset", f.off), + slog.Int("length", len(f.data)), + ) +} + // debugFrameNewToken is a NEW_TOKEN frame. type debugFrameNewToken struct { token []byte @@ -224,6 +320,13 @@ func (f debugFrameNewToken) write(w *packetWriter) bool { return w.appendNewTokenFrame(f.token) } +func (f debugFrameNewToken) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "new_token"), + slogHexstring("token", f.token), + ) +} + // debugFrameStream is a STREAM frame. type debugFrameStream struct { id streamID @@ -251,6 +354,20 @@ func (f debugFrameStream) write(w *packetWriter) bool { return added } +func (f debugFrameStream) LogValue() slog.Value { + var fin slog.Attr + if f.fin { + fin = slog.Bool("fin", true) + } + return slog.GroupValue( + slog.String("frame_type", "stream"), + slog.Uint64("stream_id", uint64(f.id)), + slog.Int64("offset", f.off), + slog.Int("length", len(f.data)), + fin, + ) +} + // debugFrameMaxData is a MAX_DATA frame. type debugFrameMaxData struct { max int64 @@ -269,6 +386,13 @@ func (f debugFrameMaxData) write(w *packetWriter) bool { return w.appendMaxDataFrame(f.max) } +func (f debugFrameMaxData) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "max_data"), + slog.Int64("maximum", f.max), + ) +} + // debugFrameMaxStreamData is a MAX_STREAM_DATA frame. type debugFrameMaxStreamData struct { id streamID @@ -288,6 +412,14 @@ func (f debugFrameMaxStreamData) write(w *packetWriter) bool { return w.appendMaxStreamDataFrame(f.id, f.max) } +func (f debugFrameMaxStreamData) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "max_stream_data"), + slog.Uint64("stream_id", uint64(f.id)), + slog.Int64("maximum", f.max), + ) +} + // debugFrameMaxStreams is a MAX_STREAMS frame. type debugFrameMaxStreams struct { streamType streamType @@ -307,6 +439,14 @@ func (f debugFrameMaxStreams) write(w *packetWriter) bool { return w.appendMaxStreamsFrame(f.streamType, f.max) } +func (f debugFrameMaxStreams) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "max_streams"), + slog.String("stream_type", f.streamType.qlogString()), + slog.Int64("maximum", f.max), + ) +} + // debugFrameDataBlocked is a DATA_BLOCKED frame. type debugFrameDataBlocked struct { max int64 @@ -325,6 +465,13 @@ func (f debugFrameDataBlocked) write(w *packetWriter) bool { return w.appendDataBlockedFrame(f.max) } +func (f debugFrameDataBlocked) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "data_blocked"), + slog.Int64("limit", f.max), + ) +} + // debugFrameStreamDataBlocked is a STREAM_DATA_BLOCKED frame. type debugFrameStreamDataBlocked struct { id streamID @@ -344,6 +491,14 @@ func (f debugFrameStreamDataBlocked) write(w *packetWriter) bool { return w.appendStreamDataBlockedFrame(f.id, f.max) } +func (f debugFrameStreamDataBlocked) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "stream_data_blocked"), + slog.Uint64("stream_id", uint64(f.id)), + slog.Int64("limit", f.max), + ) +} + // debugFrameStreamsBlocked is a STREAMS_BLOCKED frame. type debugFrameStreamsBlocked struct { streamType streamType @@ -363,6 +518,14 @@ func (f debugFrameStreamsBlocked) write(w *packetWriter) bool { return w.appendStreamsBlockedFrame(f.streamType, f.max) } +func (f debugFrameStreamsBlocked) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "streams_blocked"), + slog.String("stream_type", f.streamType.qlogString()), + slog.Int64("limit", f.max), + ) +} + // debugFrameNewConnectionID is a NEW_CONNECTION_ID frame. type debugFrameNewConnectionID struct { seq int64 @@ -384,6 +547,16 @@ func (f debugFrameNewConnectionID) write(w *packetWriter) bool { return w.appendNewConnectionIDFrame(f.seq, f.retirePriorTo, f.connID, f.token) } +func (f debugFrameNewConnectionID) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "new_connection_id"), + slog.Int64("sequence_number", f.seq), + slog.Int64("retire_prior_to", f.retirePriorTo), + slogHexstring("connection_id", f.connID), + slogHexstring("stateless_reset_token", f.token[:]), + ) +} + // debugFrameRetireConnectionID is a NEW_CONNECTION_ID frame. type debugFrameRetireConnectionID struct { seq int64 @@ -402,6 +575,13 @@ func (f debugFrameRetireConnectionID) write(w *packetWriter) bool { return w.appendRetireConnectionIDFrame(f.seq) } +func (f debugFrameRetireConnectionID) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "retire_connection_id"), + slog.Int64("sequence_number", f.seq), + ) +} + // debugFramePathChallenge is a PATH_CHALLENGE frame. type debugFramePathChallenge struct { data uint64 @@ -420,6 +600,13 @@ func (f debugFramePathChallenge) write(w *packetWriter) bool { return w.appendPathChallengeFrame(f.data) } +func (f debugFramePathChallenge) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "path_challenge"), + slog.String("data", fmt.Sprintf("%016x", f.data)), + ) +} + // debugFramePathResponse is a PATH_RESPONSE frame. type debugFramePathResponse struct { data uint64 @@ -438,6 +625,13 @@ func (f debugFramePathResponse) write(w *packetWriter) bool { return w.appendPathResponseFrame(f.data) } +func (f debugFramePathResponse) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "path_response"), + slog.String("data", fmt.Sprintf("%016x", f.data)), + ) +} + // debugFrameConnectionCloseTransport is a CONNECTION_CLOSE frame carrying a transport error. type debugFrameConnectionCloseTransport struct { code transportError @@ -465,6 +659,15 @@ func (f debugFrameConnectionCloseTransport) write(w *packetWriter) bool { return w.appendConnectionCloseTransportFrame(f.code, f.frameType, f.reason) } +func (f debugFrameConnectionCloseTransport) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "connection_close"), + slog.String("error_space", "transport"), + slog.Uint64("error_code_value", uint64(f.code)), + slog.String("reason", f.reason), + ) +} + // debugFrameConnectionCloseApplication is a CONNECTION_CLOSE frame carrying an application error. type debugFrameConnectionCloseApplication struct { code uint64 @@ -488,6 +691,15 @@ func (f debugFrameConnectionCloseApplication) write(w *packetWriter) bool { return w.appendConnectionCloseApplicationFrame(f.code, f.reason) } +func (f debugFrameConnectionCloseApplication) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "connection_close"), + slog.String("error_space", "application"), + slog.Uint64("error_code_value", uint64(f.code)), + slog.String("reason", f.reason), + ) +} + // debugFrameHandshakeDone is a HANDSHAKE_DONE frame. type debugFrameHandshakeDone struct{} @@ -502,3 +714,9 @@ func (f debugFrameHandshakeDone) String() string { func (f debugFrameHandshakeDone) write(w *packetWriter) bool { return w.appendHandshakeDoneFrame() } + +func (f debugFrameHandshakeDone) LogValue() slog.Value { + return slog.GroupValue( + slog.String("frame_type", "handshake_done"), + ) +} diff --git a/internal/quic/loss.go b/internal/quic/loss.go index 4a0767bd0..a59081fd5 100644 --- a/internal/quic/loss.go +++ b/internal/quic/loss.go @@ -50,6 +50,9 @@ type lossState struct { // https://www.rfc-editor.org/rfc/rfc9000#section-8-2 antiAmplificationLimit int + // Count of non-ack-eliciting packets (ACKs) sent since the last ack-eliciting one. + consecutiveNonAckElicitingPackets int + rtt rttState pacer pacerState cc *ccReno @@ -192,6 +195,11 @@ func (c *lossState) packetSent(now time.Time, space numberSpace, sent *sentPacke } c.scheduleTimer(now) } + if sent.ackEliciting { + c.consecutiveNonAckElicitingPackets = 0 + } else { + c.consecutiveNonAckElicitingPackets++ + } } // datagramReceived records a datagram (not packet!) received from the peer. diff --git a/internal/quic/main_test.go b/internal/quic/main_test.go new file mode 100644 index 000000000..ecd0b1e9f --- /dev/null +++ b/internal/quic/main_test.go @@ -0,0 +1,57 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "bytes" + "fmt" + "os" + "runtime" + "testing" + "time" +) + +func TestMain(m *testing.M) { + defer os.Exit(m.Run()) + + // Look for leaked goroutines. + // + // Checking after every test makes it easier to tell which test is the culprit, + // but checking once at the end is faster and less likely to miss something. + if runtime.GOOS == "js" { + // The js-wasm runtime creates an additional background goroutine. + // Just skip the leak check there. + return + } + start := time.Now() + warned := false + for { + buf := make([]byte, 2<<20) + buf = buf[:runtime.Stack(buf, true)] + leaked := false + for _, g := range bytes.Split(buf, []byte("\n\n")) { + if bytes.Contains(g, []byte("quic.TestMain")) || + bytes.Contains(g, []byte("created by os/signal.Notify")) || + bytes.Contains(g, []byte("gotraceback_test.go")) { + continue + } + leaked = true + } + if !leaked { + break + } + if !warned && time.Since(start) > 1*time.Second { + // Print a warning quickly, in case this is an interactive session. + // Keep waiting until the test times out, in case this is a slow trybot. + fmt.Printf("Tests seem to have leaked some goroutines, still waiting.\n\n") + fmt.Print(string(buf)) + warned = true + } + // Goroutines might still be shutting down. + time.Sleep(1 * time.Millisecond) + } +} diff --git a/internal/quic/packet.go b/internal/quic/packet.go index df589ccca..7a874319d 100644 --- a/internal/quic/packet.go +++ b/internal/quic/packet.go @@ -41,6 +41,22 @@ func (p packetType) String() string { return fmt.Sprintf("unknown packet type %v", byte(p)) } +func (p packetType) qlogString() string { + switch p { + case packetTypeInitial: + return "initial" + case packetType0RTT: + return "0RTT" + case packetTypeHandshake: + return "handshake" + case packetTypeRetry: + return "retry" + case packetType1RTT: + return "1RTT" + } + return "unknown" +} + // Bits set in the first byte of a packet. const ( headerFormLong = 0x80 // https://www.rfc-editor.org/rfc/rfc9000.html#section-17.2-3.2.1 diff --git a/internal/quic/packet_codec_test.go b/internal/quic/packet_codec_test.go index 7b01bb00d..475e18c1d 100644 --- a/internal/quic/packet_codec_test.go +++ b/internal/quic/packet_codec_test.go @@ -9,8 +9,13 @@ package quic import ( "bytes" "crypto/tls" + "io" + "log/slog" "reflect" "testing" + "time" + + "golang.org/x/net/internal/quic/qlog" ) func TestParseLongHeaderPacket(t *testing.T) { @@ -207,11 +212,13 @@ func TestRoundtripEncodeShortPacket(t *testing.T) { func TestFrameEncodeDecode(t *testing.T) { for _, test := range []struct { s string + j string f debugFrame b []byte truncated []byte }{{ s: "PADDING*1", + j: `{"frame_type":"padding","length":1}`, f: debugFramePadding{ size: 1, }, @@ -221,12 +228,14 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "PING", + j: `{"frame_type":"ping"}`, f: debugFramePing{}, b: []byte{ 0x01, // TYPE(i) = 0x01 }, }, { s: "ACK Delay=10 [0,16) [17,32) [48,64)", + j: `"error: debugFrameAck should not appear as a slog Value"`, f: debugFrameAck{ ackDelay: 10, ranges: []i64range[packetNumber]{ @@ -257,6 +266,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "RESET_STREAM ID=1 Code=2 FinalSize=3", + j: `{"frame_type":"reset_stream","stream_id":1,"final_size":3}`, f: debugFrameResetStream{ id: 1, code: 2, @@ -270,6 +280,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STOP_SENDING ID=1 Code=2", + j: `{"frame_type":"stop_sending","stream_id":1,"error_code":2}`, f: debugFrameStopSending{ id: 1, code: 2, @@ -281,6 +292,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "CRYPTO Offset=1 Length=2", + j: `{"frame_type":"crypto","offset":1,"length":2}`, f: debugFrameCrypto{ off: 1, data: []byte{3, 4}, @@ -299,6 +311,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "NEW_TOKEN Token=0304", + j: `{"frame_type":"new_token","token":"0304"}`, f: debugFrameNewToken{ token: []byte{3, 4}, }, @@ -309,6 +322,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAM ID=1 Offset=0 Length=0", + j: `{"frame_type":"stream","stream_id":1,"offset":0,"length":0}`, f: debugFrameStream{ id: 1, fin: false, @@ -324,6 +338,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAM ID=100 Offset=4 Length=3", + j: `{"frame_type":"stream","stream_id":100,"offset":4,"length":3}`, f: debugFrameStream{ id: 100, fin: false, @@ -346,6 +361,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAM ID=100 FIN Offset=4 Length=3", + j: `{"frame_type":"stream","stream_id":100,"offset":4,"length":3,"fin":true}`, f: debugFrameStream{ id: 100, fin: true, @@ -368,6 +384,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAM ID=1 FIN Offset=100 Length=0", + j: `{"frame_type":"stream","stream_id":1,"offset":100,"length":0,"fin":true}`, f: debugFrameStream{ id: 1, fin: true, @@ -383,6 +400,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "MAX_DATA Max=10", + j: `{"frame_type":"max_data","maximum":10}`, f: debugFrameMaxData{ max: 10, }, @@ -392,6 +410,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "MAX_STREAM_DATA ID=1 Max=10", + j: `{"frame_type":"max_stream_data","stream_id":1,"maximum":10}`, f: debugFrameMaxStreamData{ id: 1, max: 10, @@ -403,6 +422,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "MAX_STREAMS Type=bidi Max=1", + j: `{"frame_type":"max_streams","stream_type":"bidirectional","maximum":1}`, f: debugFrameMaxStreams{ streamType: bidiStream, max: 1, @@ -413,6 +433,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "MAX_STREAMS Type=uni Max=1", + j: `{"frame_type":"max_streams","stream_type":"unidirectional","maximum":1}`, f: debugFrameMaxStreams{ streamType: uniStream, max: 1, @@ -423,6 +444,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "DATA_BLOCKED Max=1", + j: `{"frame_type":"data_blocked","limit":1}`, f: debugFrameDataBlocked{ max: 1, }, @@ -432,6 +454,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAM_DATA_BLOCKED ID=1 Max=2", + j: `{"frame_type":"stream_data_blocked","stream_id":1,"limit":2}`, f: debugFrameStreamDataBlocked{ id: 1, max: 2, @@ -443,6 +466,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAMS_BLOCKED Type=bidi Max=1", + j: `{"frame_type":"streams_blocked","stream_type":"bidirectional","limit":1}`, f: debugFrameStreamsBlocked{ streamType: bidiStream, max: 1, @@ -453,6 +477,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "STREAMS_BLOCKED Type=uni Max=1", + j: `{"frame_type":"streams_blocked","stream_type":"unidirectional","limit":1}`, f: debugFrameStreamsBlocked{ streamType: uniStream, max: 1, @@ -463,6 +488,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "NEW_CONNECTION_ID Seq=3 Retire=2 ID=a0a1a2a3 Token=0102030405060708090a0b0c0d0e0f10", + j: `{"frame_type":"new_connection_id","sequence_number":3,"retire_prior_to":2,"connection_id":"a0a1a2a3","stateless_reset_token":"0102030405060708090a0b0c0d0e0f10"}`, f: debugFrameNewConnectionID{ seq: 3, retirePriorTo: 2, @@ -479,6 +505,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "RETIRE_CONNECTION_ID Seq=1", + j: `{"frame_type":"retire_connection_id","sequence_number":1}`, f: debugFrameRetireConnectionID{ seq: 1, }, @@ -488,6 +515,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "PATH_CHALLENGE Data=0123456789abcdef", + j: `{"frame_type":"path_challenge","data":"0123456789abcdef"}`, f: debugFramePathChallenge{ data: 0x0123456789abcdef, }, @@ -497,6 +525,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "PATH_RESPONSE Data=0123456789abcdef", + j: `{"frame_type":"path_response","data":"0123456789abcdef"}`, f: debugFramePathResponse{ data: 0x0123456789abcdef, }, @@ -506,6 +535,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: `CONNECTION_CLOSE Code=INTERNAL_ERROR FrameType=2 Reason="oops"`, + j: `{"frame_type":"connection_close","error_space":"transport","error_code_value":1,"reason":"oops"}`, f: debugFrameConnectionCloseTransport{ code: 1, frameType: 2, @@ -520,6 +550,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: `CONNECTION_CLOSE AppCode=1 Reason="oops"`, + j: `{"frame_type":"connection_close","error_space":"application","error_code_value":1,"reason":"oops"}`, f: debugFrameConnectionCloseApplication{ code: 1, reason: "oops", @@ -532,6 +563,7 @@ func TestFrameEncodeDecode(t *testing.T) { }, }, { s: "HANDSHAKE_DONE", + j: `{"frame_type":"handshake_done"}`, f: debugFrameHandshakeDone{}, b: []byte{ 0x1e, // Type (i) = 0x1e, @@ -554,6 +586,9 @@ func TestFrameEncodeDecode(t *testing.T) { if got, want := test.f.String(), test.s; got != want { t.Errorf("frame.String():\ngot %q\nwant %q", got, want) } + if got, want := frameJSON(test.f), test.j; got != want { + t.Errorf("frame.LogValue():\ngot %q\nwant %q", got, want) + } // Try encoding the frame into too little space. // Most frames will result in an error; some (like STREAM frames) will truncate @@ -579,6 +614,42 @@ func TestFrameEncodeDecode(t *testing.T) { } } +func TestFrameScaledAck(t *testing.T) { + for _, test := range []struct { + j string + f debugFrameScaledAck + }{{ + j: `{"frame_type":"ack","acked_ranges":[[0,15],[17],[48,63]],"ack_delay":10.000000}`, + f: debugFrameScaledAck{ + ackDelay: 10 * time.Millisecond, + ranges: []i64range[packetNumber]{ + {0x00, 0x10}, + {0x11, 0x12}, + {0x30, 0x40}, + }, + }, + }} { + if got, want := frameJSON(test.f), test.j; got != want { + t.Errorf("frame.LogValue():\ngot %q\nwant %q", got, want) + } + } +} + +func frameJSON(f slog.LogValuer) string { + var buf bytes.Buffer + h := qlog.NewJSONHandler(qlog.HandlerOptions{ + Level: QLogLevelFrame, + NewTrace: func(info qlog.TraceInfo) (io.WriteCloser, error) { + return nopCloseWriter{&buf}, nil + }, + }) + // Log the frame, and then trim out everything but the frame from the log. + slog.New(h).Info("message", slog.Any("frame", f)) + _, b, _ := bytes.Cut(buf.Bytes(), []byte(`"frame":`)) + b = bytes.TrimSuffix(b, []byte("}}\n")) + return string(b) +} + func TestFrameDecode(t *testing.T) { for _, test := range []struct { desc string diff --git a/internal/quic/packet_writer.go b/internal/quic/packet_writer.go index 0c2b2ee41..b4e54ce4b 100644 --- a/internal/quic/packet_writer.go +++ b/internal/quic/packet_writer.go @@ -47,6 +47,11 @@ func (w *packetWriter) datagram() []byte { return w.b } +// packet returns the size of the current packet. +func (w *packetWriter) packetLen() int { + return len(w.b[w.pktOff:]) + aeadOverhead +} + // payload returns the payload of the current packet. func (w *packetWriter) payload() []byte { return w.b[w.payOff:] diff --git a/internal/quic/qlog.go b/internal/quic/qlog.go index ea53cab1e..82ad92ac8 100644 --- a/internal/quic/qlog.go +++ b/internal/quic/qlog.go @@ -11,6 +11,7 @@ import ( "encoding/hex" "log/slog" "net/netip" + "time" ) // Log levels for qlog events. @@ -145,3 +146,109 @@ func (c *Conn) logConnectionClosed() { slog.String("trigger", trigger), ) } + +func (c *Conn) logLongPacketReceived(p longPacket, pkt []byte) { + var frames slog.Attr + if c.logEnabled(QLogLevelFrame) { + frames = c.packetFramesAttr(p.payload) + } + c.log.LogAttrs(context.Background(), QLogLevelPacket, + "transport:packet_received", + slog.Group("header", + slog.String("packet_type", p.ptype.qlogString()), + slog.Uint64("packet_number", uint64(p.num)), + slog.Uint64("flags", uint64(pkt[0])), + slogHexstring("scid", p.srcConnID), + slogHexstring("dcid", p.dstConnID), + ), + slog.Group("raw", + slog.Int("length", len(pkt)), + ), + frames, + ) +} + +func (c *Conn) log1RTTPacketReceived(p shortPacket, pkt []byte) { + var frames slog.Attr + if c.logEnabled(QLogLevelFrame) { + frames = c.packetFramesAttr(p.payload) + } + dstConnID, _ := dstConnIDForDatagram(pkt) + c.log.LogAttrs(context.Background(), QLogLevelPacket, + "transport:packet_received", + slog.Group("header", + slog.String("packet_type", packetType1RTT.qlogString()), + slog.Uint64("packet_number", uint64(p.num)), + slog.Uint64("flags", uint64(pkt[0])), + slogHexstring("dcid", dstConnID), + ), + slog.Group("raw", + slog.Int("length", len(pkt)), + ), + frames, + ) +} + +func (c *Conn) logPacketSent(ptype packetType, pnum packetNumber, src, dst []byte, pktLen int, payload []byte) { + var frames slog.Attr + if c.logEnabled(QLogLevelFrame) { + frames = c.packetFramesAttr(payload) + } + var scid slog.Attr + if len(src) > 0 { + scid = slogHexstring("scid", src) + } + c.log.LogAttrs(context.Background(), QLogLevelPacket, + "transport:packet_sent", + slog.Group("header", + slog.String("packet_type", ptype.qlogString()), + slog.Uint64("packet_number", uint64(pnum)), + scid, + slogHexstring("dcid", dst), + ), + slog.Group("raw", + slog.Int("length", pktLen), + ), + frames, + ) +} + +// packetFramesAttr returns the "frames" attribute containing the frames in a packet. +// We currently pass this as a slog Any containing a []slog.Value, +// where each Value is a debugFrame that implements slog.LogValuer. +// +// This isn't tremendously efficient, but avoids the need to put a JSON encoder +// in the quic package or a frame parser in the qlog package. +func (c *Conn) packetFramesAttr(payload []byte) slog.Attr { + var frames []slog.Value + for len(payload) > 0 { + f, n := parseDebugFrame(payload) + if n < 0 { + break + } + payload = payload[n:] + switch f := f.(type) { + case debugFrameAck: + // The qlog ACK frame contains the ACK Delay field as a duration. + // Interpreting the contents of this field as a duration requires + // knowing the peer's ack_delay_exponent transport parameter, + // and it's possible for us to parse an ACK frame before we've + // received that parameter. + // + // We could plumb connection state down into the frame parser, + // but for now let's minimize the amount of code that needs to + // deal with this and convert the unscaled value into a scaled one here. + ackDelay := time.Duration(-1) + if c.peerAckDelayExponent >= 0 { + ackDelay = f.ackDelay.Duration(uint8(c.peerAckDelayExponent)) + } + frames = append(frames, slog.AnyValue(debugFrameScaledAck{ + ranges: f.ranges, + ackDelay: ackDelay, + })) + default: + frames = append(frames, slog.AnyValue(f)) + } + } + return slog.Any("frames", frames) +} diff --git a/internal/quic/qlog/json_writer.go b/internal/quic/qlog/json_writer.go index 50cf33bc5..b2fa3e03e 100644 --- a/internal/quic/qlog/json_writer.go +++ b/internal/quic/qlog/json_writer.go @@ -42,38 +42,56 @@ func (w *jsonWriter) writeRecordEnd() { w.mu.Unlock() } -// writeAttrsField writes a []slog.Attr as an object field. -func (w *jsonWriter) writeAttrsField(name string, attrs []slog.Attr) { - w.writeName(name) +func (w *jsonWriter) writeAttrs(attrs []slog.Attr) { w.buf.WriteByte('{') for _, a := range attrs { + if a.Key == "" { + continue + } w.writeAttr(a) } w.buf.WriteByte('}') } -// writeAttr writes a slog.Attr as an object field. func (w *jsonWriter) writeAttr(a slog.Attr) { - v := a.Value.Resolve() + w.writeName(a.Key) + w.writeValue(a.Value) +} + +// writeAttr writes a []slog.Attr as an object field. +func (w *jsonWriter) writeAttrsField(name string, attrs []slog.Attr) { + w.writeName(name) + w.writeAttrs(attrs) +} + +func (w *jsonWriter) writeValue(v slog.Value) { + v = v.Resolve() switch v.Kind() { case slog.KindAny: - w.writeStringField(a.Key, fmt.Sprint(v.Any())) + switch v := v.Any().(type) { + case []slog.Value: + w.writeArray(v) + case interface{ AppendJSON([]byte) []byte }: + w.buf.Write(v.AppendJSON(w.buf.AvailableBuffer())) + default: + w.writeString(fmt.Sprint(v)) + } case slog.KindBool: - w.writeBoolField(a.Key, v.Bool()) + w.writeBool(v.Bool()) case slog.KindDuration: - w.writeDurationField(a.Key, v.Duration()) + w.writeDuration(v.Duration()) case slog.KindFloat64: - w.writeFloat64Field(a.Key, v.Float64()) + w.writeFloat64(v.Float64()) case slog.KindInt64: - w.writeInt64Field(a.Key, v.Int64()) + w.writeInt64(v.Int64()) case slog.KindString: - w.writeStringField(a.Key, v.String()) + w.writeString(v.String()) case slog.KindTime: - w.writeTimeField(a.Key, v.Time()) + w.writeTime(v.Time()) case slog.KindUint64: - w.writeUint64Field(a.Key, v.Uint64()) + w.writeUint64(v.Uint64()) case slog.KindGroup: - w.writeAttrsField(a.Key, v.Group()) + w.writeAttrs(v.Group()) default: w.writeString("unhandled kind") } @@ -89,24 +107,41 @@ func (w *jsonWriter) writeName(name string) { w.buf.WriteByte(':') } -// writeObject writes an object-valued object field. -// The function f is called to write the contents. -func (w *jsonWriter) writeObjectField(name string, f func()) { - w.writeName(name) +func (w *jsonWriter) writeObject(f func()) { w.buf.WriteByte('{') f() w.buf.WriteByte('}') } -// writeRawField writes an field with a raw JSON value. -func (w *jsonWriter) writeRawField(name, v string) { +// writeObject writes an object-valued object field. +// The function f is called to write the contents. +func (w *jsonWriter) writeObjectField(name string, f func()) { w.writeName(name) + w.writeObject(f) +} + +func (w *jsonWriter) writeArray(vals []slog.Value) { + w.buf.WriteByte('[') + for i, v := range vals { + if i != 0 { + w.buf.WriteByte(',') + } + w.writeValue(v) + } + w.buf.WriteByte(']') +} + +func (w *jsonWriter) writeRaw(v string) { w.buf.WriteString(v) } -// writeBoolField writes a bool-valued object field. -func (w *jsonWriter) writeBoolField(name string, v bool) { +// writeRawField writes a field with a raw JSON value. +func (w *jsonWriter) writeRawField(name, v string) { w.writeName(name) + w.writeRaw(v) +} + +func (w *jsonWriter) writeBool(v bool) { if v { w.buf.WriteString("true") } else { @@ -114,40 +149,66 @@ func (w *jsonWriter) writeBoolField(name string, v bool) { } } +// writeBoolField writes a bool-valued object field. +func (w *jsonWriter) writeBoolField(name string, v bool) { + w.writeName(name) + w.writeBool(v) +} + +// writeDuration writes a duration as milliseconds. +func (w *jsonWriter) writeDuration(v time.Duration) { + if v < 0 { + w.buf.WriteByte('-') + v = -v + } + fmt.Fprintf(&w.buf, "%d.%06d", v.Milliseconds(), v%time.Millisecond) +} + // writeDurationField writes a millisecond duration-valued object field. func (w *jsonWriter) writeDurationField(name string, v time.Duration) { w.writeName(name) - fmt.Fprintf(&w.buf, "%d.%06d", v.Milliseconds(), v%time.Millisecond) + w.writeDuration(v) +} + +func (w *jsonWriter) writeFloat64(v float64) { + w.buf.Write(strconv.AppendFloat(w.buf.AvailableBuffer(), v, 'f', -1, 64)) } // writeFloat64Field writes an float64-valued object field. func (w *jsonWriter) writeFloat64Field(name string, v float64) { w.writeName(name) - w.buf.Write(strconv.AppendFloat(w.buf.AvailableBuffer(), v, 'f', -1, 64)) + w.writeFloat64(v) +} + +func (w *jsonWriter) writeInt64(v int64) { + w.buf.Write(strconv.AppendInt(w.buf.AvailableBuffer(), v, 10)) } // writeInt64Field writes an int64-valued object field. func (w *jsonWriter) writeInt64Field(name string, v int64) { w.writeName(name) - w.buf.Write(strconv.AppendInt(w.buf.AvailableBuffer(), v, 10)) + w.writeInt64(v) +} + +func (w *jsonWriter) writeUint64(v uint64) { + w.buf.Write(strconv.AppendUint(w.buf.AvailableBuffer(), v, 10)) } // writeUint64Field writes a uint64-valued object field. func (w *jsonWriter) writeUint64Field(name string, v uint64) { w.writeName(name) - w.buf.Write(strconv.AppendUint(w.buf.AvailableBuffer(), v, 10)) + w.writeUint64(v) } -// writeStringField writes a string-valued object field. -func (w *jsonWriter) writeStringField(name, v string) { - w.writeName(name) - w.writeString(v) +// writeTime writes a time as seconds since the Unix epoch. +func (w *jsonWriter) writeTime(v time.Time) { + fmt.Fprintf(&w.buf, "%d.%06d", v.UnixMilli(), v.Nanosecond()%int(time.Millisecond)) } // writeTimeField writes a time-valued object field. func (w *jsonWriter) writeTimeField(name string, v time.Time) { w.writeName(name) - fmt.Fprintf(&w.buf, "%d.%06d", v.UnixMilli(), v.Nanosecond()%int(time.Millisecond)) + w.writeTime(v) } func jsonSafeSet(c byte) bool { @@ -192,3 +253,9 @@ func (w *jsonWriter) writeString(v string) { } w.buf.WriteByte('"') } + +// writeStringField writes a string-valued object field. +func (w *jsonWriter) writeStringField(name, v string) { + w.writeName(name) + w.writeString(v) +} diff --git a/internal/quic/qlog/json_writer_test.go b/internal/quic/qlog/json_writer_test.go index 7ba5e1737..6da556641 100644 --- a/internal/quic/qlog/json_writer_test.go +++ b/internal/quic/qlog/json_writer_test.go @@ -124,9 +124,10 @@ func TestJSONWriterBoolField(t *testing.T) { func TestJSONWriterDurationField(t *testing.T) { w := newTestJSONWriter() w.writeRecordStart() - w.writeDurationField("field", (10*time.Millisecond)+(2*time.Nanosecond)) + w.writeDurationField("field1", (10*time.Millisecond)+(2*time.Nanosecond)) + w.writeDurationField("field2", -((10 * time.Millisecond) + (2 * time.Nanosecond))) w.writeRecordEnd() - wantJSONRecord(t, w, `{"field":10.000002}`) + wantJSONRecord(t, w, `{"field1":10.000002,"field2":-10.000002}`) } func TestJSONWriterFloat64Field(t *testing.T) { diff --git a/internal/quic/qlog/qlog.go b/internal/quic/qlog/qlog.go index 0e71d71aa..e54c839f0 100644 --- a/internal/quic/qlog/qlog.go +++ b/internal/quic/qlog/qlog.go @@ -180,7 +180,7 @@ func newTraceWriter(opts HandlerOptions, info TraceInfo) (io.WriteCloser, error) if !filepath.IsLocal(filename) { return nil, errors.New("invalid trace filename") } - w, err = os.Create(filepath.Join(opts.Dir, filename)) + w, err = os.OpenFile(filepath.Join(opts.Dir, filename), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0666) } else { err = errors.New("no log destination") } diff --git a/internal/quic/qlog_test.go b/internal/quic/qlog_test.go index 119f5d16a..e98b11838 100644 --- a/internal/quic/qlog_test.go +++ b/internal/quic/qlog_test.go @@ -55,6 +55,41 @@ func TestQLogHandshake(t *testing.T) { }) } +func TestQLogPacketFrames(t *testing.T) { + qr := &qlogRecord{} + tc := newTestConn(t, clientSide, qr.config) + tc.handshake() + tc.conn.Abort(nil) + tc.writeFrames(packetType1RTT, debugFrameConnectionCloseTransport{}) + tc.advanceToTimer() // let the conn finish draining + + qr.wantEvents(t, jsonEvent{ + "name": "transport:packet_sent", + "data": map[string]any{ + "header": map[string]any{ + "packet_type": "initial", + "packet_number": 0, + "dcid": hex.EncodeToString(testLocalConnID(-1)), + "scid": hex.EncodeToString(testLocalConnID(0)), + }, + "frames": []any{ + map[string]any{"frame_type": "crypto"}, + }, + }, + }, jsonEvent{ + "name": "transport:packet_received", + "data": map[string]any{ + "header": map[string]any{ + "packet_type": "initial", + "packet_number": 0, + "dcid": hex.EncodeToString(testLocalConnID(0)), + "scid": hex.EncodeToString(testPeerConnID(0)), + }, + "frames": []any{map[string]any{"frame_type": "crypto"}}, + }, + }) +} + func TestQLogConnectionClosedTrigger(t *testing.T) { for _, test := range []struct { trigger string @@ -137,21 +172,60 @@ func (j jsonEvent) String() string { return string(b) } -// eventPartialEqual verifies that every field set in want matches the corresponding field in got. -// It ignores additional fields in got. -func eventPartialEqual(got, want jsonEvent) bool { - for k := range want { - ge, gok := got[k].(map[string]any) - we, wok := want[k].(map[string]any) - if gok && wok { - if !eventPartialEqual(ge, we) { - return false +// jsonPartialEqual compares two JSON structures. +// It ignores fields not set in want (see below for specifics). +func jsonPartialEqual(got, want any) (equal bool) { + cmpval := func(v any) any { + // Map certain types to a common representation. + switch v := v.(type) { + case int: + // JSON uses float64s rather than ints for numbers. + // Map int->float64 so we can use integers in expectations. + return float64(v) + case jsonEvent: + return (map[string]any)(v) + case []jsonEvent: + s := []any{} + for _, e := range v { + s = append(s, e) } - } else { - if !reflect.DeepEqual(got[k], want[k]) { + return s + } + return v + } + got = cmpval(got) + want = cmpval(want) + if reflect.TypeOf(got) != reflect.TypeOf(want) { + return false + } + switch w := want.(type) { + case nil: + // Match anything. + case map[string]any: + // JSON object: Every field in want must match a field in got. + g := got.(map[string]any) + for k := range w { + if !jsonPartialEqual(g[k], w[k]) { return false } } + case []any: + // JSON slice: Every field in want must match a field in got, in order. + // So want=[2,4] matches got=[1,2,3,4] but not [4,2]. + g := got.([]any) + for _, ge := range g { + if jsonPartialEqual(ge, w[0]) { + w = w[1:] + if len(w) == 0 { + return true + } + } + } + return false + default: + if !reflect.DeepEqual(got, want) { + return false + } } return true } @@ -179,6 +253,7 @@ func (q *qlogRecord) Close() error { return nil } // config may be passed to newTestConn to configure the conn to use this logger. func (q *qlogRecord) config(c *Config) { c.QLogLogger = slog.New(qlog.NewJSONHandler(qlog.HandlerOptions{ + Level: QLogLevelFrame, NewTrace: func(info qlog.TraceInfo) (io.WriteCloser, error) { return q, nil }, @@ -189,14 +264,7 @@ func (q *qlogRecord) config(c *Config) { func (q *qlogRecord) wantEvents(t *testing.T, want ...jsonEvent) { t.Helper() got := q.ev - unseen := want - for _, g := range got { - if eventPartialEqual(g, unseen[0]) { - unseen = unseen[1:] - if len(unseen) == 0 { - return - } - } + if !jsonPartialEqual(got, want) { + t.Fatalf("got events:\n%v\n\nwant events:\n%v", got, want) } - t.Fatalf("got events:\n%v\n\nwant events:\n%v", got, want) } diff --git a/internal/quic/quic.go b/internal/quic/quic.go index e4d0d77c7..3e62d7cd9 100644 --- a/internal/quic/quic.go +++ b/internal/quic/quic.go @@ -144,6 +144,17 @@ const ( streamTypeCount ) +func (s streamType) qlogString() string { + switch s { + case bidiStream: + return "bidirectional" + case uniStream: + return "unidirectional" + default: + return "BUG" + } +} + func (s streamType) String() string { switch s { case bidiStream: diff --git a/internal/quic/retry_test.go b/internal/quic/retry_test.go index 4a21a4ca1..8f36e1bd3 100644 --- a/internal/quic/retry_test.go +++ b/internal/quic/retry_test.go @@ -533,7 +533,9 @@ func initialClientCrypto(t *testing.T, e *testEndpoint, p transportParameters) [ tlsClient := tls.QUICClient(config) tlsClient.SetTransportParameters(marshalTransportParameters(p)) tlsClient.Start(context.Background()) - //defer tlsClient.Close() + t.Cleanup(func() { + tlsClient.Close() + }) e.peerTLSConn = tlsClient var data []byte for { diff --git a/internal/quic/stream.go b/internal/quic/stream.go index 36c80f6af..fb9c1cf3c 100644 --- a/internal/quic/stream.go +++ b/internal/quic/stream.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "math" ) type Stream struct { @@ -105,6 +106,11 @@ const ( dataQueue // streamsState.queueData ) +// streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream +// was implicitly reset when the connection closed. It's out of the range of +// possible reset codes the peer can send. +const streamResetByConnClose = math.MaxInt64 + // wantQueue returns the send queue the stream should be on. func (s streamState) wantQueue() streamQueue { switch { @@ -347,7 +353,15 @@ func (s *Stream) CloseContext(ctx context.Context) error { } s.CloseWrite() // TODO: Return code from peer's RESET_STREAM frame? - return s.conn.waitOnDone(ctx, s.outdone) + if err := s.conn.waitOnDone(ctx, s.outdone); err != nil { + return err + } + s.outgate.lock() + defer s.outUnlock() + if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) { + return nil + } + return errors.New("stream reset") } // CloseRead aborts reads on the stream. @@ -437,6 +451,31 @@ func (s *Stream) resetInternal(code uint64, userClosed bool) { s.outblocked.clear() } +// connHasClosed indicates the stream's conn has closed. +func (s *Stream) connHasClosed() { + // If we're in the closing state, the user closed the conn. + // Otherwise, we the peer initiated the close. + // This only matters for the error we're going to return from stream operations. + localClose := s.conn.lifetime.state == connStateClosing + + s.ingate.lock() + if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 { + if localClose { + s.inclosed.set() + } else { + s.inresetcode = streamResetByConnClose + } + } + s.inUnlock() + + s.outgate.lock() + if localClose { + s.outclosed.set() + } + s.outreset.set() + s.outUnlock() +} + // inUnlock unlocks s.ingate. // It sets the gate condition if reads from s will not block. // If s has receive-related frames to write or if both directions diff --git a/internal/quic/stream_limits.go b/internal/quic/stream_limits.go index 2f42cf418..71cc29135 100644 --- a/internal/quic/stream_limits.go +++ b/internal/quic/stream_limits.go @@ -21,7 +21,7 @@ import ( type localStreamLimits struct { gate gate max int64 // peer-provided MAX_STREAMS - opened int64 // number of streams opened by us + opened int64 // number of streams opened by us, -1 when conn is closed } func (lim *localStreamLimits) init() { @@ -34,10 +34,21 @@ func (lim *localStreamLimits) open(ctx context.Context, c *Conn) (num int64, err if err := lim.gate.waitAndLock(ctx, c.testHooks); err != nil { return 0, err } - n := lim.opened + if lim.opened < 0 { + lim.gate.unlock(true) + return 0, errConnClosed + } + num = lim.opened lim.opened++ lim.gate.unlock(lim.opened < lim.max) - return n, nil + return num, nil +} + +// connHasClosed indicates the connection has been closed, locally or by the peer. +func (lim *localStreamLimits) connHasClosed() { + lim.gate.lock() + lim.opened = -1 + lim.gate.unlock(true) } // setMax sets the MAX_STREAMS provided by the peer. diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go index 93c8839ff..00e392dba 100644 --- a/internal/quic/stream_test.go +++ b/internal/quic/stream_test.go @@ -1047,11 +1047,13 @@ func TestStreamCloseUnblocked(t *testing.T) { for _, test := range []struct { name string unblock func(tc *testConn, s *Stream) + success bool }{{ name: "data received", unblock: func(tc *testConn, s *Stream) { tc.writeAckForAll() }, + success: true, }, { name: "stop sending received", unblock: func(tc *testConn, s *Stream) { @@ -1094,7 +1096,13 @@ func TestStreamCloseUnblocked(t *testing.T) { t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err) } test.unblock(tc, s) - if _, err := closing.result(); err != nil { + _, err := closing.result() + switch { + case err == errNotDone: + t.Fatalf("s.CloseContext() still blocking; want it to have returned") + case err == nil && !test.success: + t.Fatalf("s.CloseContext() = nil, want error") + case err != nil && test.success: t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err) } }) @@ -1390,31 +1398,41 @@ func newTestConnAndStream(t *testing.T, side connSide, sside streamSide, styp st func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) { t.Helper() - ctx := canceledContext() tc := newTestConn(t, side, opts...) tc.handshake() tc.ignoreFrame(frameTypeAck) + return tc, newLocalStream(t, tc, styp) +} + +func newLocalStream(t *testing.T, tc *testConn, styp streamType) *Stream { + t.Helper() + ctx := canceledContext() s, err := tc.conn.newLocalStream(ctx, styp) if err != nil { t.Fatalf("conn.newLocalStream(%v) = %v", styp, err) } - return tc, s + return s } func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) { t.Helper() - ctx := canceledContext() tc := newTestConn(t, side, opts...) tc.handshake() tc.ignoreFrame(frameTypeAck) + return tc, newRemoteStream(t, tc, styp) +} + +func newRemoteStream(t *testing.T, tc *testConn, styp streamType) *Stream { + t.Helper() + ctx := canceledContext() tc.writeFrames(packetType1RTT, debugFrameStream{ - id: newStreamID(side.peer(), styp, 0), + id: newStreamID(tc.conn.side.peer(), styp, 0), }) s, err := tc.conn.AcceptStream(ctx) if err != nil { t.Fatalf("conn.AcceptStream() = %v", err) } - return tc, s + return s } // permissiveTransportParameters may be passed as an option to newTestConn. diff --git a/internal/quic/tls_test.go b/internal/quic/tls_test.go index 14f74a00a..9c1dd364e 100644 --- a/internal/quic/tls_test.go +++ b/internal/quic/tls_test.go @@ -10,7 +10,6 @@ import ( "crypto/tls" "crypto/x509" "errors" - "reflect" "testing" "time" ) @@ -56,7 +55,7 @@ func (tc *testConn) handshake() { fillCryptoFrames(want, tc.cryptoDataOut) i++ } - if !reflect.DeepEqual(got, want) { + if !datagramEqual(got, want) { t.Fatalf("dgram %v:\ngot %v\n\nwant %v", i, got, want) } if i >= len(dgrams) {