Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal/ascii"
28 "net/textproto"
29 "net/url"
30 "reflect"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35 _ "unsafe"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http/httpproxy"
39 )
40
41
42
43
44
45
46 var DefaultTransport RoundTripper = &Transport{
47 Proxy: ProxyFromEnvironment,
48 DialContext: defaultTransportDialContext(&net.Dialer{
49 Timeout: 30 * time.Second,
50 KeepAlive: 30 * time.Second,
51 }),
52 ForceAttemptHTTP2: true,
53 MaxIdleConns: 100,
54 IdleConnTimeout: 90 * time.Second,
55 TLSHandshakeTimeout: 10 * time.Second,
56 ExpectContinueTimeout: 1 * time.Second,
57 }
58
59
60
61 const DefaultMaxIdleConnsPerHost = 2
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 type Transport struct {
98 idleMu sync.Mutex
99 closeIdle bool
100 idleConn map[connectMethodKey][]*persistConn
101 idleConnWait map[connectMethodKey]wantConnQueue
102 idleLRU connLRU
103
104 reqMu sync.Mutex
105 reqCanceler map[*Request]context.CancelCauseFunc
106
107 altMu sync.Mutex
108 altProto atomic.Value
109
110 connsPerHostMu sync.Mutex
111 connsPerHost map[connectMethodKey]int
112 connsPerHostWait map[connectMethodKey]wantConnQueue
113 dialsInProgress wantConnQueue
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 Proxy func(*Request) (*url.URL, error)
130
131
132
133
134 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
135
136
137
138
139
140
141
142
143
144 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
145
146
147
148
149
150
151
152
153
154
155
156 Dial func(network, addr string) (net.Conn, error)
157
158
159
160
161
162
163
164
165
166
167
168 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
169
170
171
172
173
174
175
176 DialTLS func(network, addr string) (net.Conn, error)
177
178
179
180
181
182 TLSClientConfig *tls.Config
183
184
185
186 TLSHandshakeTimeout time.Duration
187
188
189
190
191
192
193 DisableKeepAlives bool
194
195
196
197
198
199
200
201
202
203 DisableCompression bool
204
205
206
207 MaxIdleConns int
208
209
210
211
212 MaxIdleConnsPerHost int
213
214
215
216
217
218
219 MaxConnsPerHost int
220
221
222
223
224
225 IdleConnTimeout time.Duration
226
227
228
229
230
231 ResponseHeaderTimeout time.Duration
232
233
234
235
236
237
238
239
240 ExpectContinueTimeout time.Duration
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
256
257
258
259
260 ProxyConnectHeader Header
261
262
263
264
265
266
267
268
269 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
270
271
272
273
274
275
276 MaxResponseHeaderBytes int64
277
278
279
280
281 WriteBufferSize int
282
283
284
285
286 ReadBufferSize int
287
288
289
290 nextProtoOnce sync.Once
291 h2transport h2Transport
292 h3transport dialClientConner
293 tlsNextProtoWasNil bool
294
295
296
297
298
299
300 ForceAttemptHTTP2 bool
301
302
303 HTTP2 *HTTP2Config
304
305
306
307
308
309
310
311
312
313 Protocols *Protocols
314 }
315
316 func (t *Transport) writeBufferSize() int {
317 if t.WriteBufferSize > 0 {
318 return t.WriteBufferSize
319 }
320 return 4 << 10
321 }
322
323 func (t *Transport) readBufferSize() int {
324 if t.ReadBufferSize > 0 {
325 return t.ReadBufferSize
326 }
327 return 4 << 10
328 }
329
330 func (t *Transport) maxHeaderResponseSize() int64 {
331 if t.MaxResponseHeaderBytes > 0 {
332 return t.MaxResponseHeaderBytes
333 }
334 return 10 << 20
335 }
336
337
338 func (t *Transport) Clone() *Transport {
339 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
340 t2 := &Transport{
341 Proxy: t.Proxy,
342 OnProxyConnectResponse: t.OnProxyConnectResponse,
343 DialContext: t.DialContext,
344 Dial: t.Dial,
345 DialTLS: t.DialTLS,
346 DialTLSContext: t.DialTLSContext,
347 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
348 DisableKeepAlives: t.DisableKeepAlives,
349 DisableCompression: t.DisableCompression,
350 MaxIdleConns: t.MaxIdleConns,
351 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
352 MaxConnsPerHost: t.MaxConnsPerHost,
353 IdleConnTimeout: t.IdleConnTimeout,
354 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
355 ExpectContinueTimeout: t.ExpectContinueTimeout,
356 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
357 GetProxyConnectHeader: t.GetProxyConnectHeader,
358 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
359 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
360 WriteBufferSize: t.WriteBufferSize,
361 ReadBufferSize: t.ReadBufferSize,
362 }
363 if t.TLSClientConfig != nil {
364 t2.TLSClientConfig = t.TLSClientConfig.Clone()
365 }
366 if t.HTTP2 != nil {
367 t2.HTTP2 = &HTTP2Config{}
368 *t2.HTTP2 = *t.HTTP2
369 }
370 if t.Protocols != nil {
371 t2.Protocols = &Protocols{}
372 *t2.Protocols = *t.Protocols
373 }
374 if !t.tlsNextProtoWasNil {
375 npm := maps.Clone(t.TLSNextProto)
376 if npm == nil {
377 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
378 }
379 t2.TLSNextProto = npm
380 }
381 return t2
382 }
383
384 type dialClientConner interface {
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411 DialClientConn(ctx context.Context, address string, proxy *url.URL, internalStateHook func()) (RoundTripper, error)
412 }
413
414
415
416
417
418
419
420 type h2Transport interface {
421 CloseIdleConnections()
422 }
423
424 func (t *Transport) hasCustomTLSDialer() bool {
425 return t.DialTLS != nil || t.DialTLSContext != nil
426 }
427
428 var http2client = godebug.New("http2client")
429
430
431
432 func (t *Transport) onceSetNextProtoDefaults() {
433 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
434 if http2client.Value() == "0" {
435 http2client.IncNonDefault()
436 return
437 }
438
439
440
441
442
443
444 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
445 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
446 if v := rv.Field(0); v.CanInterface() {
447 if h2i, ok := v.Interface().(h2Transport); ok {
448 t.h2transport = h2i
449 return
450 }
451 }
452 }
453
454 if _, ok := t.TLSNextProto["h2"]; ok {
455
456 return
457 }
458 protocols := t.protocols()
459 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
460 return
461 }
462 if omitBundledHTTP2 {
463 return
464 }
465 t2, err := http2configureTransports(t)
466 if err != nil {
467 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
468 return
469 }
470 t.h2transport = t2
471
472
473
474
475
476
477
478 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
479 const h2max = 1<<32 - 1
480 if limit1 >= h2max {
481 t2.MaxHeaderListSize = h2max
482 } else {
483 t2.MaxHeaderListSize = uint32(limit1)
484 }
485 }
486
487
488
489
490
491
492 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
493 }
494
495 func (t *Transport) protocols() Protocols {
496 if t.Protocols != nil {
497 return *t.Protocols
498 }
499 var p Protocols
500 p.SetHTTP1(true)
501 switch {
502 case t.TLSNextProto != nil:
503
504
505 if t.TLSNextProto["h2"] != nil {
506 p.SetHTTP2(true)
507 }
508 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
509
510
511
512
513
514
515 case http2client.Value() == "0":
516 default:
517 p.SetHTTP2(true)
518 }
519 return p
520 }
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
539 return envProxyFunc()(req.URL)
540 }
541
542
543
544 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
545 return func(*Request) (*url.URL, error) {
546 return fixedURL, nil
547 }
548 }
549
550
551
552
553 type transportRequest struct {
554 *Request
555 extra Header
556 trace *httptrace.ClientTrace
557
558 ctx context.Context
559 cancel context.CancelCauseFunc
560
561 mu sync.Mutex
562 err error
563 }
564
565 func (tr *transportRequest) extraHeaders() Header {
566 if tr.extra == nil {
567 tr.extra = make(Header)
568 }
569 return tr.extra
570 }
571
572 func (tr *transportRequest) setError(err error) {
573 tr.mu.Lock()
574 if tr.err == nil {
575 tr.err = err
576 }
577 tr.mu.Unlock()
578 }
579
580
581
582 func (t *Transport) useRegisteredProtocol(req *Request) bool {
583 if req.URL.Scheme == "https" && req.requiresHTTP1() {
584
585
586
587
588 return false
589 }
590 return true
591 }
592
593
594
595
596 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
597 if !t.useRegisteredProtocol(req) {
598 return nil
599 }
600 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
601 return altProto[req.URL.Scheme]
602 }
603
604 func validateHeaders(hdrs Header) string {
605 for k, vv := range hdrs {
606 if !httpguts.ValidHeaderFieldName(k) {
607 return fmt.Sprintf("field name %q", k)
608 }
609 for _, v := range vv {
610 if !httpguts.ValidHeaderFieldValue(v) {
611
612
613 return fmt.Sprintf("field value for %q", k)
614 }
615 }
616 }
617 return ""
618 }
619
620
621 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
622 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
623 ctx := req.Context()
624 trace := httptrace.ContextClientTrace(ctx)
625
626 if req.URL == nil {
627 req.closeBody()
628 return nil, errors.New("http: nil Request.URL")
629 }
630 if req.Header == nil {
631 req.closeBody()
632 return nil, errors.New("http: nil Request.Header")
633 }
634 scheme := req.URL.Scheme
635 isHTTP := scheme == "http" || scheme == "https"
636 if isHTTP {
637
638 if err := validateHeaders(req.Header); err != "" {
639 req.closeBody()
640 return nil, fmt.Errorf("net/http: invalid header %s", err)
641 }
642
643
644 if err := validateHeaders(req.Trailer); err != "" {
645 req.closeBody()
646 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
647 }
648 }
649
650 origReq := req
651 req = setupRewindBody(req)
652
653 if altRT := t.alternateRoundTripper(req); altRT != nil {
654 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
655 return resp, err
656 }
657 var err error
658 req, err = rewindBody(req)
659 if err != nil {
660 return nil, err
661 }
662 }
663 if !isHTTP {
664 req.closeBody()
665 return nil, badStringError("unsupported protocol scheme", scheme)
666 }
667 if req.Method != "" && !validMethod(req.Method) {
668 req.closeBody()
669 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
670 }
671 if req.URL.Host == "" {
672 req.closeBody()
673 return nil, errors.New("http: no Host in request URL")
674 }
675
676
677
678
679
680
681
682
683
684
685 ctx, cancel := context.WithCancelCause(req.Context())
686
687
688 if origReq.Cancel != nil {
689 go awaitLegacyCancel(ctx, cancel, origReq)
690 }
691
692
693
694
695
696 cancel = t.prepareTransportCancel(origReq, cancel)
697
698 defer func() {
699 if err != nil {
700 cancel(err)
701 }
702 }()
703
704 for {
705 select {
706 case <-ctx.Done():
707 req.closeBody()
708 return nil, context.Cause(ctx)
709 default:
710 }
711
712
713 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
714 cm, err := t.connectMethodForRequest(treq)
715 if err != nil {
716 req.closeBody()
717 return nil, err
718 }
719
720
721
722
723
724 pconn, err := t.getConn(treq, cm)
725 if err != nil {
726 req.closeBody()
727 return nil, err
728 }
729
730 var resp *Response
731 if pconn.alt != nil {
732
733 resp, err = pconn.alt.RoundTrip(req)
734 } else {
735 resp, err = pconn.roundTrip(treq)
736 }
737 if err == nil {
738 if pconn.alt != nil {
739
740
741
742
743
744 cancel(errRequestDone)
745 }
746 resp.Request = origReq
747 return resp, nil
748 }
749
750
751 if http2isNoCachedConnError(err) {
752 if t.removeIdleConn(pconn) {
753 t.decConnsPerHost(pconn.cacheKey)
754 }
755 } else if !pconn.shouldRetryRequest(req, err) {
756
757
758 if e, ok := err.(nothingWrittenError); ok {
759 err = e.error
760 }
761 if e, ok := err.(transportReadFromServerError); ok {
762 err = e.err
763 }
764 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
765
766
767
768 req.closeBody()
769 }
770 return nil, err
771 }
772 testHookRoundTripRetried()
773
774
775 req, err = rewindBody(req)
776 if err != nil {
777 return nil, err
778 }
779 }
780 }
781
782 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
783 select {
784 case <-req.Cancel:
785 cancel(errRequestCanceled)
786 case <-ctx.Done():
787 }
788 }
789
790 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
791
792 type readTrackingBody struct {
793 io.ReadCloser
794 didRead bool
795 didClose atomic.Bool
796 }
797
798 func (r *readTrackingBody) Read(data []byte) (int, error) {
799 r.didRead = true
800 return r.ReadCloser.Read(data)
801 }
802
803 func (r *readTrackingBody) Close() error {
804 if !r.didClose.CompareAndSwap(false, true) {
805 return nil
806 }
807 return r.ReadCloser.Close()
808 }
809
810
811
812
813
814 func setupRewindBody(req *Request) *Request {
815 if req.Body == nil || req.Body == NoBody {
816 return req
817 }
818 newReq := *req
819 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
820 return &newReq
821 }
822
823
824
825
826
827 func rewindBody(req *Request) (rewound *Request, err error) {
828 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
829 return req, nil
830 }
831 if !req.Body.(*readTrackingBody).didClose.Load() {
832 req.closeBody()
833 }
834 if req.GetBody == nil {
835 return nil, errCannotRewind
836 }
837 body, err := req.GetBody()
838 if err != nil {
839 return nil, err
840 }
841 newReq := *req
842 newReq.Body = &readTrackingBody{ReadCloser: body}
843 return &newReq, nil
844 }
845
846
847
848
849 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
850 if http2isNoCachedConnError(err) {
851
852
853
854
855
856
857 return true
858 }
859 if err == errMissingHost {
860
861 return false
862 }
863 if !pc.isReused() {
864
865
866
867
868
869
870
871 return false
872 }
873 if _, ok := err.(nothingWrittenError); ok {
874
875
876 return req.outgoingLength() == 0 || req.GetBody != nil
877 }
878 if !req.isReplayable() {
879
880 return false
881 }
882 if _, ok := err.(transportReadFromServerError); ok {
883
884
885 return true
886 }
887 if err == errServerClosedIdle {
888
889
890
891 return true
892 }
893 return false
894 }
895
896
897 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
898
899
900
901
902
903
904
905
906
907
908
909 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
910 t.altMu.Lock()
911 defer t.altMu.Unlock()
912
913 if scheme == "http/3" {
914 var ok bool
915 if t.h3transport, ok = rt.(dialClientConner); !ok {
916 panic("http: HTTP/3 RoundTripper does not implement DialClientConn")
917 }
918 }
919
920 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
921 if _, exists := oldMap[scheme]; exists {
922 panic("protocol " + scheme + " already registered")
923 }
924 newMap := maps.Clone(oldMap)
925 if newMap == nil {
926 newMap = make(map[string]RoundTripper)
927 }
928 newMap[scheme] = rt
929 t.altProto.Store(newMap)
930 }
931
932
933
934
935
936 func (t *Transport) CloseIdleConnections() {
937 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
938 t.idleMu.Lock()
939 m := t.idleConn
940 t.idleConn = nil
941 t.closeIdle = true
942 t.idleLRU = connLRU{}
943 t.idleMu.Unlock()
944 for _, conns := range m {
945 for _, pconn := range conns {
946 pconn.close(errCloseIdleConns)
947 }
948 }
949 t.connsPerHostMu.Lock()
950 t.dialsInProgress.all(func(w *wantConn) {
951 if w.cancelCtx != nil && !w.waiting() {
952 w.cancelCtx()
953 }
954 })
955 t.connsPerHostMu.Unlock()
956 if t2 := t.h2transport; t2 != nil {
957 t2.CloseIdleConnections()
958 }
959 }
960
961
962 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
963
964
965
966
967
968
969 cancel := func(err error) {
970 origCancel(err)
971 t.reqMu.Lock()
972 delete(t.reqCanceler, req)
973 t.reqMu.Unlock()
974 }
975 t.reqMu.Lock()
976 if t.reqCanceler == nil {
977 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
978 }
979 t.reqCanceler[req] = cancel
980 t.reqMu.Unlock()
981 return cancel
982 }
983
984
985
986
987
988
989
990 func (t *Transport) CancelRequest(req *Request) {
991 t.reqMu.Lock()
992 cancel := t.reqCanceler[req]
993 t.reqMu.Unlock()
994 if cancel != nil {
995 cancel(errRequestCanceled)
996 }
997 }
998
999
1000
1001
1002
1003 var (
1004 envProxyOnce sync.Once
1005 envProxyFuncValue func(*url.URL) (*url.URL, error)
1006 )
1007
1008
1009
1010 func envProxyFunc() func(*url.URL) (*url.URL, error) {
1011 envProxyOnce.Do(func() {
1012 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
1013 })
1014 return envProxyFuncValue
1015 }
1016
1017
1018 func resetProxyConfig() {
1019 envProxyOnce = sync.Once{}
1020 envProxyFuncValue = nil
1021 }
1022
1023 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
1024 cm.targetScheme = treq.URL.Scheme
1025 cm.targetAddr = canonicalAddr(treq.URL)
1026 if t.Proxy != nil {
1027 cm.proxyURL, err = t.Proxy(treq.Request)
1028 }
1029 cm.onlyH1 = treq.requiresHTTP1()
1030 return cm, err
1031 }
1032
1033
1034
1035 func (cm *connectMethod) proxyAuth() string {
1036 if cm.proxyURL == nil {
1037 return ""
1038 }
1039 if u := cm.proxyURL.User; u != nil {
1040 username := u.Username()
1041 password, _ := u.Password()
1042 return "Basic " + basicAuth(username, password)
1043 }
1044 return ""
1045 }
1046
1047
1048 var (
1049 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1050 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1051 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1052 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1053 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1054 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1055 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1056 errIdleConnTimeout = errors.New("http: idle connection timeout")
1057
1058
1059
1060
1061
1062 errServerClosedIdle = errors.New("http: server closed idle connection")
1063 )
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073 type transportReadFromServerError struct {
1074 err error
1075 }
1076
1077 func (e transportReadFromServerError) Unwrap() error { return e.err }
1078
1079 func (e transportReadFromServerError) Error() string {
1080 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1081 }
1082
1083 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1084 if err := t.tryPutIdleConn(pconn); err != nil {
1085 pconn.close(err)
1086 }
1087 }
1088
1089 func (t *Transport) maxIdleConnsPerHost() int {
1090 if v := t.MaxIdleConnsPerHost; v != 0 {
1091 return v
1092 }
1093 return DefaultMaxIdleConnsPerHost
1094 }
1095
1096
1097
1098
1099
1100
1101 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1102 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1103 return errKeepAlivesDisabled
1104 }
1105 if pconn.isBroken() {
1106 return errConnBroken
1107 }
1108 pconn.markReused()
1109 if pconn.isClientConn {
1110
1111 defer pconn.internalStateHook()
1112 pconn.mu.Lock()
1113 defer pconn.mu.Unlock()
1114 if !pconn.inFlight {
1115 panic("pconn is not in flight")
1116 }
1117 pconn.inFlight = false
1118 select {
1119 case pconn.availch <- struct{}{}:
1120 default:
1121 panic("unable to make pconn available")
1122 }
1123 return nil
1124 }
1125
1126 t.idleMu.Lock()
1127 defer t.idleMu.Unlock()
1128
1129
1130
1131
1132 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1133 return nil
1134 }
1135
1136
1137
1138
1139
1140 key := pconn.cacheKey
1141 if q, ok := t.idleConnWait[key]; ok {
1142 done := false
1143 if pconn.alt == nil {
1144
1145
1146 for q.len() > 0 {
1147 w := q.popFront()
1148 if w.tryDeliver(pconn, nil, time.Time{}) {
1149 done = true
1150 break
1151 }
1152 }
1153 } else {
1154
1155
1156
1157
1158 for q.len() > 0 {
1159 w := q.popFront()
1160 w.tryDeliver(pconn, nil, time.Time{})
1161 }
1162 }
1163 if q.len() == 0 {
1164 delete(t.idleConnWait, key)
1165 } else {
1166 t.idleConnWait[key] = q
1167 }
1168 if done {
1169 return nil
1170 }
1171 }
1172
1173 if t.closeIdle {
1174 return errCloseIdle
1175 }
1176 if t.idleConn == nil {
1177 t.idleConn = make(map[connectMethodKey][]*persistConn)
1178 }
1179 idles := t.idleConn[key]
1180 if len(idles) >= t.maxIdleConnsPerHost() {
1181 return errTooManyIdleHost
1182 }
1183 for _, exist := range idles {
1184 if exist == pconn {
1185 log.Fatalf("dup idle pconn %p in freelist", pconn)
1186 }
1187 }
1188 t.idleConn[key] = append(idles, pconn)
1189 t.idleLRU.add(pconn)
1190 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1191 oldest := t.idleLRU.removeOldest()
1192 oldest.close(errTooManyIdle)
1193 t.removeIdleConnLocked(oldest)
1194 }
1195
1196
1197
1198
1199 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1200 if pconn.idleTimer != nil {
1201 pconn.idleTimer.Reset(t.IdleConnTimeout)
1202 } else {
1203 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1204 }
1205 }
1206 pconn.idleAt = time.Now()
1207 return nil
1208 }
1209
1210
1211
1212
1213 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1214 if t.DisableKeepAlives {
1215 return false
1216 }
1217
1218 t.idleMu.Lock()
1219 defer t.idleMu.Unlock()
1220
1221
1222
1223 t.closeIdle = false
1224
1225 if w == nil {
1226
1227 return false
1228 }
1229
1230
1231
1232
1233 var oldTime time.Time
1234 if t.IdleConnTimeout > 0 {
1235 oldTime = time.Now().Add(-t.IdleConnTimeout)
1236 }
1237
1238
1239 if list, ok := t.idleConn[w.key]; ok {
1240 stop := false
1241 delivered := false
1242 for len(list) > 0 && !stop {
1243 pconn := list[len(list)-1]
1244
1245
1246
1247
1248 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1249 if tooOld {
1250
1251
1252
1253 go pconn.closeConnIfStillIdle()
1254 }
1255 if pconn.isBroken() || tooOld {
1256
1257
1258
1259
1260
1261 list = list[:len(list)-1]
1262 continue
1263 }
1264 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1265 if delivered {
1266 if pconn.alt != nil {
1267
1268
1269 } else {
1270
1271
1272 t.idleLRU.remove(pconn)
1273 list = list[:len(list)-1]
1274 }
1275 }
1276 stop = true
1277 }
1278 if len(list) > 0 {
1279 t.idleConn[w.key] = list
1280 } else {
1281 delete(t.idleConn, w.key)
1282 }
1283 if stop {
1284 return delivered
1285 }
1286 }
1287
1288
1289 if t.idleConnWait == nil {
1290 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1291 }
1292 q := t.idleConnWait[w.key]
1293 q.cleanFrontNotWaiting()
1294 q.pushBack(w)
1295 t.idleConnWait[w.key] = q
1296 return false
1297 }
1298
1299
1300 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1301 if pconn.isClientConn {
1302 return true
1303 }
1304 t.idleMu.Lock()
1305 defer t.idleMu.Unlock()
1306 return t.removeIdleConnLocked(pconn)
1307 }
1308
1309
1310 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1311 if pconn.idleTimer != nil {
1312 pconn.idleTimer.Stop()
1313 }
1314 t.idleLRU.remove(pconn)
1315 key := pconn.cacheKey
1316 pconns := t.idleConn[key]
1317 var removed bool
1318 switch len(pconns) {
1319 case 0:
1320
1321 case 1:
1322 if pconns[0] == pconn {
1323 delete(t.idleConn, key)
1324 removed = true
1325 }
1326 default:
1327 for i, v := range pconns {
1328 if v != pconn {
1329 continue
1330 }
1331
1332
1333 copy(pconns[i:], pconns[i+1:])
1334 t.idleConn[key] = pconns[:len(pconns)-1]
1335 removed = true
1336 break
1337 }
1338 }
1339 return removed
1340 }
1341
1342 var zeroDialer net.Dialer
1343
1344 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1345 if t.DialContext != nil {
1346 c, err := t.DialContext(ctx, network, addr)
1347 if c == nil && err == nil {
1348 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1349 }
1350 return c, err
1351 }
1352 if t.Dial != nil {
1353 c, err := t.Dial(network, addr)
1354 if c == nil && err == nil {
1355 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1356 }
1357 return c, err
1358 }
1359 return zeroDialer.DialContext(ctx, network, addr)
1360 }
1361
1362
1363
1364
1365
1366
1367
1368 type wantConn struct {
1369 cm connectMethod
1370 key connectMethodKey
1371
1372
1373
1374
1375 beforeDial func()
1376 afterDial func()
1377
1378 mu sync.Mutex
1379 ctx context.Context
1380 cancelCtx context.CancelFunc
1381 done bool
1382 result chan connOrError
1383 }
1384
1385 type connOrError struct {
1386 pc *persistConn
1387 err error
1388 idleAt time.Time
1389 }
1390
1391
1392 func (w *wantConn) waiting() bool {
1393 w.mu.Lock()
1394 defer w.mu.Unlock()
1395
1396 return !w.done
1397 }
1398
1399
1400 func (w *wantConn) getCtxForDial() context.Context {
1401 w.mu.Lock()
1402 defer w.mu.Unlock()
1403
1404 return w.ctx
1405 }
1406
1407
1408 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1409 w.mu.Lock()
1410 defer w.mu.Unlock()
1411
1412 if w.done {
1413 return false
1414 }
1415 if (pc == nil) == (err == nil) {
1416 panic("net/http: internal error: misuse of tryDeliver")
1417 }
1418 w.ctx = nil
1419 w.done = true
1420
1421 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1422 close(w.result)
1423
1424 return true
1425 }
1426
1427
1428
1429 func (w *wantConn) cancel(t *Transport) {
1430 w.mu.Lock()
1431 var pc *persistConn
1432 if w.done {
1433 if r, ok := <-w.result; ok {
1434 pc = r.pc
1435 }
1436 } else {
1437 close(w.result)
1438 }
1439 w.ctx = nil
1440 w.done = true
1441 w.mu.Unlock()
1442
1443
1444
1445
1446 if pc != nil && pc.alt == nil {
1447 t.putOrCloseIdleConn(pc)
1448 }
1449 }
1450
1451
1452 type wantConnQueue struct {
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463 head []*wantConn
1464 headPos int
1465 tail []*wantConn
1466 }
1467
1468
1469 func (q *wantConnQueue) len() int {
1470 return len(q.head) - q.headPos + len(q.tail)
1471 }
1472
1473
1474 func (q *wantConnQueue) pushBack(w *wantConn) {
1475 q.tail = append(q.tail, w)
1476 }
1477
1478
1479 func (q *wantConnQueue) popFront() *wantConn {
1480 if q.headPos >= len(q.head) {
1481 if len(q.tail) == 0 {
1482 return nil
1483 }
1484
1485 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1486 }
1487 w := q.head[q.headPos]
1488 q.head[q.headPos] = nil
1489 q.headPos++
1490 return w
1491 }
1492
1493
1494 func (q *wantConnQueue) peekFront() *wantConn {
1495 if q.headPos < len(q.head) {
1496 return q.head[q.headPos]
1497 }
1498 if len(q.tail) > 0 {
1499 return q.tail[0]
1500 }
1501 return nil
1502 }
1503
1504
1505
1506 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1507 for {
1508 w := q.peekFront()
1509 if w == nil || w.waiting() {
1510 return cleaned
1511 }
1512 q.popFront()
1513 cleaned = true
1514 }
1515 }
1516
1517
1518 func (q *wantConnQueue) cleanFrontCanceled() {
1519 for {
1520 w := q.peekFront()
1521 if w == nil || w.cancelCtx != nil {
1522 return
1523 }
1524 q.popFront()
1525 }
1526 }
1527
1528
1529
1530 func (q *wantConnQueue) all(f func(*wantConn)) {
1531 for _, w := range q.head[q.headPos:] {
1532 f(w)
1533 }
1534 for _, w := range q.tail {
1535 f(w)
1536 }
1537 }
1538
1539 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1540 if t.DialTLSContext != nil {
1541 conn, err = t.DialTLSContext(ctx, network, addr)
1542 } else {
1543 conn, err = t.DialTLS(network, addr)
1544 }
1545 if conn == nil && err == nil {
1546 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1547 }
1548 return
1549 }
1550
1551
1552
1553
1554
1555 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1556 req := treq.Request
1557 trace := treq.trace
1558 ctx := req.Context()
1559 if trace != nil && trace.GetConn != nil {
1560 trace.GetConn(cm.addr())
1561 }
1562
1563
1564
1565
1566
1567
1568 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1569
1570 w := &wantConn{
1571 cm: cm,
1572 key: cm.key(),
1573 ctx: dialCtx,
1574 cancelCtx: dialCancel,
1575 result: make(chan connOrError, 1),
1576 beforeDial: testHookPrePendingDial,
1577 afterDial: testHookPostPendingDial,
1578 }
1579 defer func() {
1580 if err != nil {
1581 w.cancel(t)
1582 }
1583 }()
1584
1585
1586 if delivered := t.queueForIdleConn(w); !delivered {
1587 t.queueForDial(w)
1588 }
1589
1590
1591 select {
1592 case r := <-w.result:
1593
1594
1595 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1596 info := httptrace.GotConnInfo{
1597 Conn: r.pc.conn,
1598 Reused: r.pc.isReused(),
1599 }
1600 if !r.idleAt.IsZero() {
1601 info.WasIdle = true
1602 info.IdleTime = time.Since(r.idleAt)
1603 }
1604 trace.GotConn(info)
1605 }
1606 if r.err != nil {
1607
1608
1609
1610 select {
1611 case <-treq.ctx.Done():
1612 err := context.Cause(treq.ctx)
1613 if err == errRequestCanceled {
1614 err = errRequestCanceledConn
1615 }
1616 return nil, err
1617 default:
1618
1619 }
1620 }
1621 return r.pc, r.err
1622 case <-treq.ctx.Done():
1623 err := context.Cause(treq.ctx)
1624 if err == errRequestCanceled {
1625 err = errRequestCanceledConn
1626 }
1627 return nil, err
1628 }
1629 }
1630
1631
1632
1633 func (t *Transport) queueForDial(w *wantConn) {
1634 w.beforeDial()
1635
1636 t.connsPerHostMu.Lock()
1637 defer t.connsPerHostMu.Unlock()
1638
1639 if t.MaxConnsPerHost <= 0 {
1640 t.startDialConnForLocked(w)
1641 return
1642 }
1643
1644 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1645 if t.connsPerHost == nil {
1646 t.connsPerHost = make(map[connectMethodKey]int)
1647 }
1648 t.connsPerHost[w.key] = n + 1
1649 t.startDialConnForLocked(w)
1650 return
1651 }
1652
1653 if t.connsPerHostWait == nil {
1654 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1655 }
1656 q := t.connsPerHostWait[w.key]
1657 q.cleanFrontNotWaiting()
1658 q.pushBack(w)
1659 t.connsPerHostWait[w.key] = q
1660 }
1661
1662
1663
1664 func (t *Transport) startDialConnForLocked(w *wantConn) {
1665 t.dialsInProgress.cleanFrontCanceled()
1666 t.dialsInProgress.pushBack(w)
1667 go func() {
1668 t.dialConnFor(w)
1669 t.connsPerHostMu.Lock()
1670 defer t.connsPerHostMu.Unlock()
1671 w.cancelCtx = nil
1672 }()
1673 }
1674
1675
1676
1677
1678 func (t *Transport) dialConnFor(w *wantConn) {
1679 defer w.afterDial()
1680 ctx := w.getCtxForDial()
1681 if ctx == nil {
1682 t.decConnsPerHost(w.key)
1683 return
1684 }
1685
1686 const isClientConn = false
1687 pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
1688 delivered := w.tryDeliver(pc, err, time.Time{})
1689 if err == nil && (!delivered || pc.alt != nil) {
1690
1691
1692
1693 t.putOrCloseIdleConn(pc)
1694 }
1695 if err != nil {
1696 t.decConnsPerHost(w.key)
1697 }
1698 }
1699
1700
1701
1702 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1703 if t.MaxConnsPerHost <= 0 {
1704 return
1705 }
1706
1707 t.connsPerHostMu.Lock()
1708 defer t.connsPerHostMu.Unlock()
1709 n := t.connsPerHost[key]
1710 if n == 0 {
1711
1712
1713 panic("net/http: internal error: connCount underflow")
1714 }
1715
1716
1717
1718
1719
1720 if q := t.connsPerHostWait[key]; q.len() > 0 {
1721 done := false
1722 for q.len() > 0 {
1723 w := q.popFront()
1724 if w.waiting() {
1725 t.startDialConnForLocked(w)
1726 done = true
1727 break
1728 }
1729 }
1730 if q.len() == 0 {
1731 delete(t.connsPerHostWait, key)
1732 } else {
1733
1734
1735 t.connsPerHostWait[key] = q
1736 }
1737 if done {
1738 return
1739 }
1740 }
1741
1742
1743 if n--; n == 0 {
1744 delete(t.connsPerHost, key)
1745 } else {
1746 t.connsPerHost[key] = n
1747 }
1748 }
1749
1750
1751
1752
1753 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1754
1755 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1756 if cfg.ServerName == "" {
1757 cfg.ServerName = name
1758 }
1759 if pconn.cacheKey.onlyH1 {
1760 cfg.NextProtos = nil
1761 }
1762 plainConn := pconn.conn
1763 tlsConn := tls.Client(plainConn, cfg)
1764 errc := make(chan error, 2)
1765 var timer *time.Timer
1766 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1767 timer = time.AfterFunc(d, func() {
1768 errc <- tlsHandshakeTimeoutError{}
1769 })
1770 }
1771 go func() {
1772 if trace != nil && trace.TLSHandshakeStart != nil {
1773 trace.TLSHandshakeStart()
1774 }
1775 err := tlsConn.HandshakeContext(ctx)
1776 if timer != nil {
1777 timer.Stop()
1778 }
1779 errc <- err
1780 }()
1781 if err := <-errc; err != nil {
1782 plainConn.Close()
1783 if err == (tlsHandshakeTimeoutError{}) {
1784
1785
1786 <-errc
1787 }
1788 if trace != nil && trace.TLSHandshakeDone != nil {
1789 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1790 }
1791 return err
1792 }
1793 cs := tlsConn.ConnectionState()
1794 if trace != nil && trace.TLSHandshakeDone != nil {
1795 trace.TLSHandshakeDone(cs, nil)
1796 }
1797 pconn.tlsState = &cs
1798 pconn.conn = tlsConn
1799 return nil
1800 }
1801
1802 type erringRoundTripper interface {
1803 RoundTripErr() error
1804 }
1805
1806 var testHookProxyConnectTimeout = context.WithTimeout
1807
1808 func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
1809
1810
1811
1812
1813 if p := t.protocols(); p.http3() {
1814 if p.HTTP1() || p.HTTP2() || p.UnencryptedHTTP2() {
1815 return nil, errors.New("http: when using HTTP3, Transport.Protocols must contain only HTTP3")
1816 }
1817 if t.h3transport == nil {
1818 return nil, errors.New("http: Transport.Protocols contains HTTP3, but Transport does not support HTTP/3")
1819 }
1820 rt, err := t.h3transport.DialClientConn(ctx, cm.addr(), cm.proxyURL, internalStateHook)
1821 if err != nil {
1822 return nil, err
1823 }
1824 return &persistConn{
1825 t: t,
1826 cacheKey: cm.key(),
1827 alt: rt,
1828 }, nil
1829 }
1830
1831 pconn = &persistConn{
1832 t: t,
1833 cacheKey: cm.key(),
1834 reqch: make(chan requestAndChan, 1),
1835 writech: make(chan writeRequest, 1),
1836 closech: make(chan struct{}),
1837 writeErrCh: make(chan error, 1),
1838 writeLoopDone: make(chan struct{}),
1839 isClientConn: isClientConn,
1840 internalStateHook: internalStateHook,
1841 }
1842 trace := httptrace.ContextClientTrace(ctx)
1843 wrapErr := func(err error) error {
1844 if cm.proxyURL != nil {
1845
1846 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1847 }
1848 return err
1849 }
1850 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1851 var err error
1852 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1853 if err != nil {
1854 return nil, wrapErr(err)
1855 }
1856 if tc, ok := pconn.conn.(*tls.Conn); ok {
1857
1858
1859 if trace != nil && trace.TLSHandshakeStart != nil {
1860 trace.TLSHandshakeStart()
1861 }
1862 if err := tc.HandshakeContext(ctx); err != nil {
1863 go pconn.conn.Close()
1864 if trace != nil && trace.TLSHandshakeDone != nil {
1865 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1866 }
1867 return nil, err
1868 }
1869 cs := tc.ConnectionState()
1870 if trace != nil && trace.TLSHandshakeDone != nil {
1871 trace.TLSHandshakeDone(cs, nil)
1872 }
1873 pconn.tlsState = &cs
1874 }
1875 } else {
1876 conn, err := t.dial(ctx, "tcp", cm.addr())
1877 if err != nil {
1878 return nil, wrapErr(err)
1879 }
1880 pconn.conn = conn
1881 if cm.scheme() == "https" {
1882 var firstTLSHost string
1883 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1884 return nil, wrapErr(err)
1885 }
1886 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1887 return nil, wrapErr(err)
1888 }
1889 }
1890 }
1891
1892
1893 switch {
1894 case cm.proxyURL == nil:
1895
1896 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1897 conn := pconn.conn
1898 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1899 if u := cm.proxyURL.User; u != nil {
1900 auth := &socksUsernamePassword{
1901 Username: u.Username(),
1902 }
1903 auth.Password, _ = u.Password()
1904 d.AuthMethods = []socksAuthMethod{
1905 socksAuthMethodNotRequired,
1906 socksAuthMethodUsernamePassword,
1907 }
1908 d.Authenticate = auth.Authenticate
1909 }
1910 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1911 conn.Close()
1912 return nil, err
1913 }
1914 case cm.targetScheme == "http":
1915 pconn.isProxy = true
1916 if pa := cm.proxyAuth(); pa != "" {
1917 pconn.mutateHeaderFunc = func(h Header) {
1918 h.Set("Proxy-Authorization", pa)
1919 }
1920 }
1921 case cm.targetScheme == "https":
1922 conn := pconn.conn
1923 var hdr Header
1924 if t.GetProxyConnectHeader != nil {
1925 var err error
1926 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1927 if err != nil {
1928 conn.Close()
1929 return nil, err
1930 }
1931 } else {
1932 hdr = t.ProxyConnectHeader
1933 }
1934 if hdr == nil {
1935 hdr = make(Header)
1936 }
1937 if pa := cm.proxyAuth(); pa != "" {
1938 hdr = hdr.Clone()
1939 hdr.Set("Proxy-Authorization", pa)
1940 }
1941 connectReq := &Request{
1942 Method: "CONNECT",
1943 URL: &url.URL{Opaque: cm.targetAddr},
1944 Host: cm.targetAddr,
1945 Header: hdr,
1946 }
1947
1948
1949
1950
1951 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1952 defer cancel()
1953
1954 didReadResponse := make(chan struct{})
1955 var (
1956 resp *Response
1957 err error
1958 )
1959
1960 go func() {
1961 defer close(didReadResponse)
1962 err = connectReq.Write(conn)
1963 if err != nil {
1964 return
1965 }
1966
1967
1968 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1969 resp, err = ReadResponse(br, connectReq)
1970 }()
1971 select {
1972 case <-connectCtx.Done():
1973 conn.Close()
1974 <-didReadResponse
1975 return nil, connectCtx.Err()
1976 case <-didReadResponse:
1977
1978 }
1979 if err != nil {
1980 conn.Close()
1981 return nil, err
1982 }
1983
1984 if t.OnProxyConnectResponse != nil {
1985 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1986 if err != nil {
1987 conn.Close()
1988 return nil, err
1989 }
1990 }
1991
1992 if resp.StatusCode != 200 {
1993 _, text, ok := strings.Cut(resp.Status, " ")
1994 conn.Close()
1995 if !ok {
1996 return nil, errors.New("unknown status code")
1997 }
1998 return nil, errors.New(text)
1999 }
2000 }
2001
2002 if cm.proxyURL != nil && cm.targetScheme == "https" {
2003 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
2004 return nil, err
2005 }
2006 }
2007
2008
2009 unencryptedHTTP2 := pconn.tlsState == nil &&
2010 t.Protocols != nil &&
2011 t.Protocols.UnencryptedHTTP2() &&
2012 !t.Protocols.HTTP1()
2013
2014 if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
2015 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
2016 h2, ok := altProto["https"].(newClientConner)
2017 if !ok {
2018 return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
2019 }
2020 alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
2021 if err != nil {
2022 pconn.conn.Close()
2023 return nil, err
2024 }
2025 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
2026 }
2027
2028 if unencryptedHTTP2 {
2029 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
2030 if !ok {
2031 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
2032 }
2033 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
2034 if e, ok := alt.(erringRoundTripper); ok {
2035
2036 return nil, e.RoundTripErr()
2037 }
2038 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2039 }
2040
2041 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
2042 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
2043 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
2044 if e, ok := alt.(erringRoundTripper); ok {
2045
2046 return nil, e.RoundTripErr()
2047 }
2048 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2049 }
2050 }
2051
2052 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
2053 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
2054
2055 go pconn.readLoop()
2056 go pconn.writeLoop()
2057 return pconn, nil
2058 }
2059
2060
2061
2062
2063
2064
2065
2066 type persistConnWriter struct {
2067 pc *persistConn
2068 }
2069
2070 func (w persistConnWriter) Write(p []byte) (n int, err error) {
2071 n, err = w.pc.conn.Write(p)
2072 w.pc.nwrite += int64(n)
2073 return
2074 }
2075
2076
2077
2078
2079 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
2080 n, err = io.Copy(w.pc.conn, r)
2081 w.pc.nwrite += n
2082 return
2083 }
2084
2085 var _ io.ReaderFrom = (*persistConnWriter)(nil)
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103 type connectMethod struct {
2104 _ incomparable
2105 proxyURL *url.URL
2106 targetScheme string
2107
2108
2109
2110 targetAddr string
2111 onlyH1 bool
2112 }
2113
2114 func (cm *connectMethod) key() connectMethodKey {
2115 proxyStr := ""
2116 targetAddr := cm.targetAddr
2117 if cm.proxyURL != nil {
2118 proxyStr = cm.proxyURL.String()
2119 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2120 targetAddr = ""
2121 }
2122 }
2123 return connectMethodKey{
2124 proxy: proxyStr,
2125 scheme: cm.targetScheme,
2126 addr: targetAddr,
2127 onlyH1: cm.onlyH1,
2128 }
2129 }
2130
2131
2132 func (cm *connectMethod) scheme() string {
2133 if cm.proxyURL != nil {
2134 return cm.proxyURL.Scheme
2135 }
2136 return cm.targetScheme
2137 }
2138
2139
2140 func (cm *connectMethod) addr() string {
2141 if cm.proxyURL != nil {
2142 return canonicalAddr(cm.proxyURL)
2143 }
2144 return cm.targetAddr
2145 }
2146
2147
2148
2149 func (cm *connectMethod) tlsHost() string {
2150 h := cm.targetAddr
2151 return removePort(h)
2152 }
2153
2154
2155
2156
2157 type connectMethodKey struct {
2158 proxy, scheme, addr string
2159 onlyH1 bool
2160 }
2161
2162 func (k connectMethodKey) String() string {
2163
2164 var h1 string
2165 if k.onlyH1 {
2166 h1 = ",h1"
2167 }
2168 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2169 }
2170
2171
2172
2173 type persistConn struct {
2174
2175
2176
2177 alt RoundTripper
2178
2179 t *Transport
2180 cacheKey connectMethodKey
2181 conn net.Conn
2182 tlsState *tls.ConnectionState
2183 br *bufio.Reader
2184 bw *bufio.Writer
2185 nwrite int64
2186 reqch chan requestAndChan
2187 writech chan writeRequest
2188 closech chan struct{}
2189 availch chan struct{}
2190 isProxy bool
2191 sawEOF bool
2192 isClientConn bool
2193 readLimit int64
2194
2195
2196
2197
2198 writeErrCh chan error
2199
2200 writeLoopDone chan struct{}
2201
2202
2203 idleAt time.Time
2204 idleTimer *time.Timer
2205
2206 mu sync.Mutex
2207 numExpectedResponses int
2208 closed error
2209 canceledErr error
2210 reused bool
2211 reserved bool
2212 inFlight bool
2213 internalStateHook func()
2214
2215
2216
2217
2218 mutateHeaderFunc func(Header)
2219 }
2220
2221 func (pc *persistConn) maxHeaderResponseSize() int64 {
2222 return pc.t.maxHeaderResponseSize()
2223 }
2224
2225 func (pc *persistConn) Read(p []byte) (n int, err error) {
2226 if pc.readLimit <= 0 {
2227 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2228 }
2229 if int64(len(p)) > pc.readLimit {
2230 p = p[:pc.readLimit]
2231 }
2232 n, err = pc.conn.Read(p)
2233 if err == io.EOF {
2234 pc.sawEOF = true
2235 }
2236 pc.readLimit -= int64(n)
2237 return
2238 }
2239
2240
2241 func (pc *persistConn) isBroken() bool {
2242 pc.mu.Lock()
2243 b := pc.closed != nil
2244 pc.mu.Unlock()
2245 return b
2246 }
2247
2248
2249
2250 func (pc *persistConn) canceled() error {
2251 pc.mu.Lock()
2252 defer pc.mu.Unlock()
2253 return pc.canceledErr
2254 }
2255
2256
2257 func (pc *persistConn) isReused() bool {
2258 pc.mu.Lock()
2259 r := pc.reused
2260 pc.mu.Unlock()
2261 return r
2262 }
2263
2264 func (pc *persistConn) cancelRequest(err error) {
2265 pc.mu.Lock()
2266 defer pc.mu.Unlock()
2267 pc.canceledErr = err
2268 pc.closeLocked(errRequestCanceled)
2269 }
2270
2271
2272
2273
2274 func (pc *persistConn) closeConnIfStillIdle() {
2275 t := pc.t
2276 t.idleMu.Lock()
2277 defer t.idleMu.Unlock()
2278 if _, ok := t.idleLRU.m[pc]; !ok {
2279
2280 return
2281 }
2282 t.removeIdleConnLocked(pc)
2283 pc.close(errIdleConnTimeout)
2284 }
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2295 if err == nil {
2296 return nil
2297 }
2298
2299
2300
2301
2302
2303
2304
2305
2306 <-pc.writeLoopDone
2307
2308
2309
2310
2311 if cerr := pc.canceled(); cerr != nil {
2312 return cerr
2313 }
2314
2315
2316 req.mu.Lock()
2317 reqErr := req.err
2318 req.mu.Unlock()
2319 if reqErr != nil {
2320 return reqErr
2321 }
2322
2323 if err == errServerClosedIdle {
2324
2325 return err
2326 }
2327
2328 if _, ok := err.(transportReadFromServerError); ok {
2329 if pc.nwrite == startBytesWritten {
2330 return nothingWrittenError{err}
2331 }
2332
2333 return err
2334 }
2335 if pc.isBroken() {
2336 if pc.nwrite == startBytesWritten {
2337 return nothingWrittenError{err}
2338 }
2339 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2340 }
2341 return err
2342 }
2343
2344
2345
2346
2347 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2348
2349
2350
2351
2352 const maxPostCloseReadBytes = 256 << 10
2353
2354
2355
2356
2357 const maxPostCloseReadTime = 50 * time.Millisecond
2358
2359 func maybeDrainBody(body io.Reader) bool {
2360 drainedCh := make(chan bool, 1)
2361 go func() {
2362 if _, err := io.CopyN(io.Discard, body, maxPostCloseReadBytes+1); err == io.EOF {
2363 drainedCh <- true
2364 } else {
2365 drainedCh <- false
2366 }
2367 }()
2368 select {
2369 case drained := <-drainedCh:
2370 return drained
2371 case <-time.After(maxPostCloseReadTime):
2372 return false
2373 }
2374 }
2375
2376 func (pc *persistConn) readLoop() {
2377 closeErr := errReadLoopExiting
2378 defer func() {
2379 pc.close(closeErr)
2380 pc.t.removeIdleConn(pc)
2381 if pc.internalStateHook != nil {
2382 pc.internalStateHook()
2383 }
2384 }()
2385
2386 tryPutIdleConn := func(treq *transportRequest) bool {
2387 trace := treq.trace
2388 if err := pc.t.tryPutIdleConn(pc); err != nil {
2389 closeErr = err
2390 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2391 trace.PutIdleConn(err)
2392 }
2393 return false
2394 }
2395 if trace != nil && trace.PutIdleConn != nil {
2396 trace.PutIdleConn(nil)
2397 }
2398 return true
2399 }
2400
2401
2402
2403
2404 eofc := make(chan struct{})
2405 defer close(eofc)
2406
2407
2408 testHookMu.Lock()
2409 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2410 testHookMu.Unlock()
2411
2412 alive := true
2413 for alive {
2414 pc.readLimit = pc.maxHeaderResponseSize()
2415 _, err := pc.br.Peek(1)
2416
2417 pc.mu.Lock()
2418 if pc.numExpectedResponses == 0 {
2419 pc.readLoopPeekFailLocked(err)
2420 pc.mu.Unlock()
2421 return
2422 }
2423 pc.mu.Unlock()
2424
2425 rc := <-pc.reqch
2426 trace := rc.treq.trace
2427
2428 var resp *Response
2429 if err == nil {
2430 resp, err = pc.readResponse(rc, trace)
2431 } else {
2432 err = transportReadFromServerError{err}
2433 closeErr = err
2434 }
2435
2436 if err != nil {
2437 if pc.readLimit <= 0 {
2438 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2439 }
2440
2441 select {
2442 case rc.ch <- responseAndError{err: err}:
2443 case <-rc.callerGone:
2444 return
2445 }
2446 return
2447 }
2448 pc.readLimit = maxInt64
2449
2450 pc.mu.Lock()
2451 pc.numExpectedResponses--
2452 pc.mu.Unlock()
2453
2454 bodyWritable := resp.bodyIsWritable()
2455 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2456
2457 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2458
2459
2460
2461 alive = false
2462 }
2463
2464 if !hasBody || bodyWritable {
2465
2466
2467
2468
2469
2470 alive = alive &&
2471 !pc.sawEOF &&
2472 pc.wroteRequest() &&
2473 tryPutIdleConn(rc.treq)
2474
2475 if bodyWritable {
2476 closeErr = errCallerOwnsConn
2477 }
2478
2479 select {
2480 case rc.ch <- responseAndError{res: resp}:
2481 case <-rc.callerGone:
2482 return
2483 }
2484
2485 rc.treq.cancel(errRequestDone)
2486
2487
2488
2489
2490 testHookReadLoopBeforeNextRead()
2491 continue
2492 }
2493
2494 waitForBodyRead := make(chan bool, 2)
2495 body := &bodyEOFSignal{
2496 body: resp.Body,
2497 earlyCloseFn: func() error {
2498 waitForBodyRead <- false
2499 <-eofc
2500 return nil
2501
2502 },
2503 fn: func(err error) error {
2504 isEOF := err == io.EOF
2505 waitForBodyRead <- isEOF
2506 if isEOF {
2507 <-eofc
2508 } else if err != nil {
2509 if cerr := pc.canceled(); cerr != nil {
2510 return cerr
2511 }
2512 }
2513 return err
2514 },
2515 }
2516
2517 resp.Body = body
2518 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2519 resp.Body = &gzipReader{body: body}
2520 resp.Header.Del("Content-Encoding")
2521 resp.Header.Del("Content-Length")
2522 resp.ContentLength = -1
2523 resp.Uncompressed = true
2524 }
2525
2526 select {
2527 case rc.ch <- responseAndError{res: resp}:
2528 case <-rc.callerGone:
2529 return
2530 }
2531
2532
2533
2534
2535 select {
2536 case bodyEOF := <-waitForBodyRead:
2537 tryDrain := !bodyEOF && resp.ContentLength <= maxPostCloseReadBytes
2538 if tryDrain {
2539 eofc <- struct{}{}
2540 bodyEOF = maybeDrainBody(body.body)
2541 }
2542 alive = alive &&
2543 bodyEOF &&
2544 !pc.sawEOF &&
2545 pc.wroteRequest() &&
2546 tryPutIdleConn(rc.treq)
2547 if !tryDrain && bodyEOF {
2548 eofc <- struct{}{}
2549 }
2550 case <-rc.treq.ctx.Done():
2551 alive = false
2552 pc.cancelRequest(context.Cause(rc.treq.ctx))
2553 case <-pc.closech:
2554 alive = false
2555 }
2556
2557 rc.treq.cancel(errRequestDone)
2558 testHookReadLoopBeforeNextRead()
2559 }
2560 }
2561
2562 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2563 if pc.closed != nil {
2564 return
2565 }
2566 if n := pc.br.Buffered(); n > 0 {
2567 buf, _ := pc.br.Peek(n)
2568 if is408Message(buf) {
2569 pc.closeLocked(errServerClosedIdle)
2570 return
2571 } else {
2572 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2573 }
2574 }
2575 if peekErr == io.EOF {
2576
2577 pc.closeLocked(errServerClosedIdle)
2578 } else {
2579 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2580 }
2581 }
2582
2583
2584
2585
2586 func is408Message(buf []byte) bool {
2587 if len(buf) < len("HTTP/1.x 408") {
2588 return false
2589 }
2590 if string(buf[:7]) != "HTTP/1." {
2591 return false
2592 }
2593 return string(buf[8:12]) == " 408"
2594 }
2595
2596
2597
2598
2599 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2600 if trace != nil && trace.GotFirstResponseByte != nil {
2601 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2602 trace.GotFirstResponseByte()
2603 }
2604 }
2605
2606 continueCh := rc.continueCh
2607 for {
2608 resp, err = ReadResponse(pc.br, rc.treq.Request)
2609 if err != nil {
2610 return
2611 }
2612 resCode := resp.StatusCode
2613 if continueCh != nil && resCode == StatusContinue {
2614 if trace != nil && trace.Got100Continue != nil {
2615 trace.Got100Continue()
2616 }
2617 continueCh <- struct{}{}
2618 continueCh = nil
2619 }
2620 is1xx := 100 <= resCode && resCode <= 199
2621
2622 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2623 if is1xxNonTerminal {
2624 if trace != nil && trace.Got1xxResponse != nil {
2625 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2626 return nil, err
2627 }
2628
2629
2630
2631
2632
2633
2634
2635 pc.readLimit = pc.maxHeaderResponseSize()
2636 }
2637 continue
2638 }
2639 break
2640 }
2641 if resp.isProtocolSwitch() {
2642 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2643 }
2644 if continueCh != nil {
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657 if resp.Close || rc.treq.Request.Close {
2658 close(continueCh)
2659 } else {
2660 continueCh <- struct{}{}
2661 }
2662 }
2663
2664 resp.TLS = pc.tlsState
2665 return
2666 }
2667
2668
2669
2670
2671 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2672 if continueCh == nil {
2673 return nil
2674 }
2675 return func() bool {
2676 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2677 defer timer.Stop()
2678
2679 select {
2680 case _, ok := <-continueCh:
2681 return ok
2682 case <-timer.C:
2683 return true
2684 case <-pc.closech:
2685 return false
2686 }
2687 }
2688 }
2689
2690 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2691 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2692 if br.Buffered() != 0 {
2693 body.br = br
2694 }
2695 return body
2696 }
2697
2698
2699
2700
2701
2702
2703 type readWriteCloserBody struct {
2704 _ incomparable
2705 br *bufio.Reader
2706 io.ReadWriteCloser
2707 }
2708
2709 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2710 if b.br != nil {
2711 if n := b.br.Buffered(); len(p) > n {
2712 p = p[:n]
2713 }
2714 n, err = b.br.Read(p)
2715 if b.br.Buffered() == 0 {
2716 b.br = nil
2717 }
2718 return n, err
2719 }
2720 return b.ReadWriteCloser.Read(p)
2721 }
2722
2723 func (b *readWriteCloserBody) CloseWrite() error {
2724 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2725 return cw.CloseWrite()
2726 }
2727 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2728 }
2729
2730
2731 type nothingWrittenError struct {
2732 error
2733 }
2734
2735 func (nwe nothingWrittenError) Unwrap() error {
2736 return nwe.error
2737 }
2738
2739 func (pc *persistConn) writeLoop() {
2740 defer close(pc.writeLoopDone)
2741 for {
2742 select {
2743 case wr := <-pc.writech:
2744 startBytesWritten := pc.nwrite
2745 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2746 if bre, ok := err.(requestBodyReadError); ok {
2747 err = bre.error
2748
2749
2750
2751
2752
2753
2754
2755 wr.req.setError(err)
2756 }
2757 if err == nil {
2758 err = pc.bw.Flush()
2759 }
2760 if err != nil {
2761 if pc.nwrite == startBytesWritten {
2762 err = nothingWrittenError{err}
2763 }
2764 }
2765 pc.writeErrCh <- err
2766 wr.ch <- err
2767 if err != nil {
2768 pc.close(err)
2769 return
2770 }
2771 case <-pc.closech:
2772 return
2773 }
2774 }
2775 }
2776
2777
2778
2779
2780
2781
2782
2783 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2784
2785
2786
2787 func (pc *persistConn) wroteRequest() bool {
2788 select {
2789 case err := <-pc.writeErrCh:
2790
2791
2792 return err == nil
2793 default:
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2805 defer t.Stop()
2806 select {
2807 case err := <-pc.writeErrCh:
2808 return err == nil
2809 case <-t.C:
2810 return false
2811 }
2812 }
2813 }
2814
2815
2816
2817 type responseAndError struct {
2818 _ incomparable
2819 res *Response
2820 err error
2821 }
2822
2823 type requestAndChan struct {
2824 _ incomparable
2825 treq *transportRequest
2826 ch chan responseAndError
2827
2828
2829
2830
2831 addedGzip bool
2832
2833
2834
2835
2836
2837 continueCh chan<- struct{}
2838
2839 callerGone <-chan struct{}
2840 }
2841
2842
2843
2844
2845
2846 type writeRequest struct {
2847 req *transportRequest
2848 ch chan<- error
2849
2850
2851
2852
2853 continueCh <-chan struct{}
2854 }
2855
2856
2857
2858 type timeoutError struct {
2859 err string
2860 }
2861
2862 func (e *timeoutError) Error() string { return e.err }
2863 func (e *timeoutError) Timeout() bool { return true }
2864 func (e *timeoutError) Temporary() bool { return true }
2865 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2866
2867 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2868
2869
2870
2871 var errRequestCanceled = http2errRequestCanceled
2872 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2873
2874
2875
2876 var errRequestDone = errors.New("net/http: request completed")
2877
2878 func nop() {}
2879
2880
2881 var (
2882 testHookEnterRoundTrip = nop
2883 testHookWaitResLoop = nop
2884 testHookRoundTripRetried = nop
2885 testHookPrePendingDial = nop
2886 testHookPostPendingDial = nop
2887
2888 testHookMu sync.Locker = fakeLocker{}
2889 testHookReadLoopBeforeNextRead = nop
2890 )
2891
2892 func (pc *persistConn) waitForAvailability(ctx context.Context) error {
2893 select {
2894 case <-pc.availch:
2895 return nil
2896 case <-pc.closech:
2897 return pc.closed
2898 case <-ctx.Done():
2899 return ctx.Err()
2900 }
2901 }
2902
2903 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2904 testHookEnterRoundTrip()
2905
2906 pc.mu.Lock()
2907 if pc.isClientConn {
2908 if !pc.reserved {
2909 pc.mu.Unlock()
2910 if err := pc.waitForAvailability(req.ctx); err != nil {
2911 return nil, err
2912 }
2913 pc.mu.Lock()
2914 }
2915 pc.reserved = false
2916 pc.inFlight = true
2917 }
2918 pc.numExpectedResponses++
2919 headerFn := pc.mutateHeaderFunc
2920 pc.mu.Unlock()
2921
2922 if headerFn != nil {
2923 headerFn(req.extraHeaders())
2924 }
2925
2926
2927
2928
2929
2930 requestedGzip := false
2931 if !pc.t.DisableCompression &&
2932 req.Header.Get("Accept-Encoding") == "" &&
2933 req.Header.Get("Range") == "" &&
2934 req.Method != "HEAD" {
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947 requestedGzip = true
2948 req.extraHeaders().Set("Accept-Encoding", "gzip")
2949 }
2950
2951 var continueCh chan struct{}
2952 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2953 continueCh = make(chan struct{}, 1)
2954 }
2955
2956 if pc.t.DisableKeepAlives &&
2957 !req.wantsClose() &&
2958 !isProtocolSwitchHeader(req.Header) {
2959 req.extraHeaders().Set("Connection", "close")
2960 }
2961
2962 gone := make(chan struct{})
2963 defer close(gone)
2964
2965 const debugRoundTrip = false
2966
2967
2968
2969
2970 startBytesWritten := pc.nwrite
2971 writeErrCh := make(chan error, 1)
2972 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2973
2974 resc := make(chan responseAndError)
2975 pc.reqch <- requestAndChan{
2976 treq: req,
2977 ch: resc,
2978 addedGzip: requestedGzip,
2979 continueCh: continueCh,
2980 callerGone: gone,
2981 }
2982
2983 handleResponse := func(re responseAndError) (*Response, error) {
2984 if (re.res == nil) == (re.err == nil) {
2985 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2986 }
2987 if debugRoundTrip {
2988 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2989 }
2990 if re.err != nil {
2991 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2992 }
2993 return re.res, nil
2994 }
2995
2996 var respHeaderTimer <-chan time.Time
2997 ctxDoneChan := req.ctx.Done()
2998 pcClosed := pc.closech
2999 for {
3000 testHookWaitResLoop()
3001 select {
3002 case err := <-writeErrCh:
3003 if debugRoundTrip {
3004 req.logf("writeErrCh recv: %T/%#v", err, err)
3005 }
3006 if err != nil {
3007 pc.close(fmt.Errorf("write error: %w", err))
3008 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
3009 }
3010 if d := pc.t.ResponseHeaderTimeout; d > 0 {
3011 if debugRoundTrip {
3012 req.logf("starting timer for %v", d)
3013 }
3014 timer := time.NewTimer(d)
3015 defer timer.Stop()
3016 respHeaderTimer = timer.C
3017 }
3018 case <-pcClosed:
3019 select {
3020 case re := <-resc:
3021
3022
3023
3024 return handleResponse(re)
3025 default:
3026 }
3027 if debugRoundTrip {
3028 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
3029 }
3030 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
3031 case <-respHeaderTimer:
3032 if debugRoundTrip {
3033 req.logf("timeout waiting for response headers.")
3034 }
3035 pc.close(errTimeout)
3036 return nil, errTimeout
3037 case re := <-resc:
3038 return handleResponse(re)
3039 case <-ctxDoneChan:
3040 select {
3041 case re := <-resc:
3042
3043
3044
3045 return handleResponse(re)
3046 default:
3047 }
3048 pc.cancelRequest(context.Cause(req.ctx))
3049 }
3050 }
3051 }
3052
3053
3054
3055 type tLogKey struct{}
3056
3057 func (tr *transportRequest) logf(format string, args ...any) {
3058 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
3059 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
3060 }
3061 }
3062
3063
3064
3065 func (pc *persistConn) markReused() {
3066 pc.mu.Lock()
3067 pc.reused = true
3068 pc.mu.Unlock()
3069 }
3070
3071
3072
3073
3074
3075
3076 func (pc *persistConn) close(err error) {
3077 pc.mu.Lock()
3078 defer pc.mu.Unlock()
3079 pc.closeLocked(err)
3080 }
3081
3082 func (pc *persistConn) closeLocked(err error) {
3083 if err == nil {
3084 panic("nil error")
3085 }
3086 if pc.closed == nil {
3087 pc.closed = err
3088 pc.t.decConnsPerHost(pc.cacheKey)
3089
3090
3091 if pc.alt == nil {
3092 if err != errCallerOwnsConn {
3093 pc.conn.Close()
3094 }
3095 close(pc.closech)
3096 }
3097 }
3098 pc.mutateHeaderFunc = nil
3099 }
3100
3101 func schemePort(scheme string) string {
3102 switch scheme {
3103 case "http":
3104 return "80"
3105 case "https":
3106 return "443"
3107 case "socks5", "socks5h":
3108 return "1080"
3109 default:
3110 return ""
3111 }
3112 }
3113
3114 func idnaASCIIFromURL(url *url.URL) string {
3115 addr := url.Hostname()
3116 if v, err := idnaASCII(addr); err == nil {
3117 addr = v
3118 }
3119 return addr
3120 }
3121
3122
3123 func canonicalAddr(url *url.URL) string {
3124 port := url.Port()
3125 if port == "" {
3126 port = schemePort(url.Scheme)
3127 }
3128 return net.JoinHostPort(idnaASCIIFromURL(url), port)
3129 }
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142 type bodyEOFSignal struct {
3143 body io.ReadCloser
3144 mu sync.Mutex
3145 closed bool
3146 rerr error
3147 fn func(error) error
3148 earlyCloseFn func() error
3149 }
3150
3151 var errReadOnClosedResBody = errors.New("http: read on closed response body")
3152 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
3153
3154 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
3155 es.mu.Lock()
3156 closed, rerr := es.closed, es.rerr
3157 es.mu.Unlock()
3158 if closed {
3159 return 0, errReadOnClosedResBody
3160 }
3161 if rerr != nil {
3162 return 0, rerr
3163 }
3164
3165 n, err = es.body.Read(p)
3166 if err != nil {
3167 es.mu.Lock()
3168 defer es.mu.Unlock()
3169 if es.rerr == nil {
3170 es.rerr = err
3171 }
3172 err = es.condfn(err)
3173 }
3174 return
3175 }
3176
3177 func (es *bodyEOFSignal) Close() error {
3178 es.mu.Lock()
3179 defer es.mu.Unlock()
3180 if es.closed {
3181 return nil
3182 }
3183 es.closed = true
3184 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3185 return es.earlyCloseFn()
3186 }
3187 err := es.body.Close()
3188 return es.condfn(err)
3189 }
3190
3191
3192 func (es *bodyEOFSignal) condfn(err error) error {
3193 if es.fn == nil {
3194 return err
3195 }
3196 err = es.fn(err)
3197 es.fn = nil
3198 return err
3199 }
3200
3201
3202
3203
3204
3205 type gzipReader struct {
3206 _ incomparable
3207 body *bodyEOFSignal
3208 mu sync.Mutex
3209 zr *gzip.Reader
3210 zerr error
3211 }
3212
3213 type eofReader struct{}
3214
3215 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3216 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3217
3218 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3219
3220
3221 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3222 zr := gzipPool.Get().(*gzip.Reader)
3223 if err := zr.Reset(r); err != nil {
3224 gzipPoolPut(zr)
3225 return nil, err
3226 }
3227 return zr, nil
3228 }
3229
3230
3231 func gzipPoolPut(zr *gzip.Reader) {
3232
3233
3234 var r flate.Reader = eofReader{}
3235 zr.Reset(r)
3236 gzipPool.Put(zr)
3237 }
3238
3239
3240
3241 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3242 gz.mu.Lock()
3243 defer gz.mu.Unlock()
3244 if gz.zerr != nil {
3245 return nil, gz.zerr
3246 }
3247 if gz.zr == nil {
3248 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3249 if gz.zerr != nil {
3250 return nil, gz.zerr
3251 }
3252 }
3253 ret := gz.zr
3254 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3255 return ret, nil
3256 }
3257
3258
3259 func (gz *gzipReader) release(zr *gzip.Reader) {
3260 gz.mu.Lock()
3261 defer gz.mu.Unlock()
3262 if gz.zerr == errConcurrentReadOnResBody {
3263 gz.zr, gz.zerr = zr, nil
3264 } else {
3265 gzipPoolPut(zr)
3266 }
3267 }
3268
3269
3270
3271 func (gz *gzipReader) close() {
3272 gz.mu.Lock()
3273 defer gz.mu.Unlock()
3274 if gz.zerr == nil && gz.zr != nil {
3275 gzipPoolPut(gz.zr)
3276 gz.zr = nil
3277 }
3278 gz.zerr = errReadOnClosedResBody
3279 }
3280
3281 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3282 zr, err := gz.acquire()
3283 if err != nil {
3284 return 0, err
3285 }
3286 defer gz.release(zr)
3287
3288 return zr.Read(p)
3289 }
3290
3291 func (gz *gzipReader) Close() error {
3292 gz.close()
3293
3294 return gz.body.Close()
3295 }
3296
3297 type tlsHandshakeTimeoutError struct{}
3298
3299 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3300 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3301 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3302
3303
3304
3305
3306 type fakeLocker struct{}
3307
3308 func (fakeLocker) Lock() {}
3309 func (fakeLocker) Unlock() {}
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3325 if cfg == nil {
3326 return &tls.Config{}
3327 }
3328 return cfg.Clone()
3329 }
3330
3331 type connLRU struct {
3332 ll *list.List
3333 m map[*persistConn]*list.Element
3334 }
3335
3336
3337 func (cl *connLRU) add(pc *persistConn) {
3338 if cl.ll == nil {
3339 cl.ll = list.New()
3340 cl.m = make(map[*persistConn]*list.Element)
3341 }
3342 ele := cl.ll.PushFront(pc)
3343 if _, ok := cl.m[pc]; ok {
3344 panic("persistConn was already in LRU")
3345 }
3346 cl.m[pc] = ele
3347 }
3348
3349 func (cl *connLRU) removeOldest() *persistConn {
3350 ele := cl.ll.Back()
3351 pc := ele.Value.(*persistConn)
3352 cl.ll.Remove(ele)
3353 delete(cl.m, pc)
3354 return pc
3355 }
3356
3357
3358 func (cl *connLRU) remove(pc *persistConn) {
3359 if ele, ok := cl.m[pc]; ok {
3360 cl.ll.Remove(ele)
3361 delete(cl.m, pc)
3362 }
3363 }
3364
3365
3366 func (cl *connLRU) len() int {
3367 return len(cl.m)
3368 }
3369
View as plain text