Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/ascii"
29 "net/textproto"
30 "net/url"
31 "reflect"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36 _ "unsafe"
37
38 "golang.org/x/net/http/httpguts"
39 "golang.org/x/net/http/httpproxy"
40 )
41
42
43
44
45
46
47 var DefaultTransport RoundTripper = &Transport{
48 Proxy: ProxyFromEnvironment,
49 DialContext: defaultTransportDialContext(&net.Dialer{
50 Timeout: 30 * time.Second,
51 KeepAlive: 30 * time.Second,
52 }),
53 ForceAttemptHTTP2: true,
54 MaxIdleConns: 100,
55 IdleConnTimeout: 90 * time.Second,
56 TLSHandshakeTimeout: 10 * time.Second,
57 ExpectContinueTimeout: 1 * time.Second,
58 }
59
60
61
62 const DefaultMaxIdleConnsPerHost = 2
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 type Transport struct {
99 idleMu sync.Mutex
100 closeIdle bool
101 idleConn map[connectMethodKey][]*persistConn
102 idleConnWait map[connectMethodKey]wantConnQueue
103 idleLRU connLRU
104
105 reqMu sync.Mutex
106 reqCanceler map[*Request]context.CancelCauseFunc
107
108 altMu sync.Mutex
109 altProto atomic.Value
110
111 connsPerHostMu sync.Mutex
112 connsPerHost map[connectMethodKey]int
113 connsPerHostWait map[connectMethodKey]wantConnQueue
114 dialsInProgress wantConnQueue
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 Proxy func(*Request) (*url.URL, error)
131
132
133
134
135 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
136
137
138
139
140
141
142
143
144
145 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
146
147
148
149
150
151
152
153
154
155
156
157 Dial func(network, addr string) (net.Conn, error)
158
159
160
161
162
163
164
165
166
167
168
169 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
170
171
172
173
174
175
176
177 DialTLS func(network, addr string) (net.Conn, error)
178
179
180
181
182
183 TLSClientConfig *tls.Config
184
185
186
187 TLSHandshakeTimeout time.Duration
188
189
190
191
192
193
194 DisableKeepAlives bool
195
196
197
198
199
200
201
202
203
204 DisableCompression bool
205
206
207
208 MaxIdleConns int
209
210
211
212
213 MaxIdleConnsPerHost int
214
215
216
217
218
219
220 MaxConnsPerHost int
221
222
223
224
225
226 IdleConnTimeout time.Duration
227
228
229
230
231
232 ResponseHeaderTimeout time.Duration
233
234
235
236
237
238
239
240
241 ExpectContinueTimeout time.Duration
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
257
258
259
260
261 ProxyConnectHeader Header
262
263
264
265
266
267
268
269
270 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
271
272
273
274
275
276
277 MaxResponseHeaderBytes int64
278
279
280
281
282 WriteBufferSize int
283
284
285
286
287 ReadBufferSize int
288
289
290
291 nextProtoOnce sync.Once
292 h2transport h2Transport
293 h3transport dialClientConner
294 tlsNextProtoWasNil bool
295
296
297
298
299
300
301 ForceAttemptHTTP2 bool
302
303
304 HTTP2 *HTTP2Config
305
306
307
308
309
310
311
312
313
314 Protocols *Protocols
315 }
316
317 func (t *Transport) writeBufferSize() int {
318 if t.WriteBufferSize > 0 {
319 return t.WriteBufferSize
320 }
321 return 4 << 10
322 }
323
324 func (t *Transport) readBufferSize() int {
325 if t.ReadBufferSize > 0 {
326 return t.ReadBufferSize
327 }
328 return 4 << 10
329 }
330
331 func (t *Transport) maxHeaderResponseSize() int64 {
332 if t.MaxResponseHeaderBytes > 0 {
333 return t.MaxResponseHeaderBytes
334 }
335 return 10 << 20
336 }
337
338
339 func (t *Transport) Clone() *Transport {
340 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
341 t2 := &Transport{
342 Proxy: t.Proxy,
343 OnProxyConnectResponse: t.OnProxyConnectResponse,
344 DialContext: t.DialContext,
345 Dial: t.Dial,
346 DialTLS: t.DialTLS,
347 DialTLSContext: t.DialTLSContext,
348 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
349 DisableKeepAlives: t.DisableKeepAlives,
350 DisableCompression: t.DisableCompression,
351 MaxIdleConns: t.MaxIdleConns,
352 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
353 MaxConnsPerHost: t.MaxConnsPerHost,
354 IdleConnTimeout: t.IdleConnTimeout,
355 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
356 ExpectContinueTimeout: t.ExpectContinueTimeout,
357 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
358 GetProxyConnectHeader: t.GetProxyConnectHeader,
359 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
360 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
361 WriteBufferSize: t.WriteBufferSize,
362 ReadBufferSize: t.ReadBufferSize,
363 }
364 if t.TLSClientConfig != nil {
365 t2.TLSClientConfig = t.TLSClientConfig.Clone()
366 }
367 if t.HTTP2 != nil {
368 t2.HTTP2 = &HTTP2Config{}
369 *t2.HTTP2 = *t.HTTP2
370 }
371 if t.Protocols != nil {
372 t2.Protocols = &Protocols{}
373 *t2.Protocols = *t.Protocols
374 }
375 if !t.tlsNextProtoWasNil {
376 npm := maps.Clone(t.TLSNextProto)
377 if npm == nil {
378 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
379 }
380 t2.TLSNextProto = npm
381 }
382 return t2
383 }
384
385 type dialClientConner interface {
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411 DialClientConn(ctx context.Context, address string, proxy *url.URL, internalStateHook func()) (RoundTripper, error)
412 }
413
414 type closeIdleConnectionser interface {
415
416
417
418
419
420
421 CloseIdleConnections()
422 }
423
424
425
426
427
428
429
430 type h2Transport interface {
431 CloseIdleConnections()
432 }
433
434 func (t *Transport) hasCustomTLSDialer() bool {
435 return t.DialTLS != nil || t.DialTLSContext != nil
436 }
437
438 var http2client = godebug.New("http2client")
439
440
441
442 func (t *Transport) onceSetNextProtoDefaults() {
443 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
444 if http2client.Value() == "0" {
445 http2client.IncNonDefault()
446 return
447 }
448
449
450
451
452
453
454 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
455 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
456 if v := rv.Field(0); v.CanInterface() {
457 if h2i, ok := v.Interface().(h2Transport); ok {
458 t.h2transport = h2i
459 return
460 }
461 }
462 }
463
464 if _, ok := t.TLSNextProto["h2"]; ok {
465
466 return
467 }
468 protocols := t.protocols()
469 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
470 return
471 }
472 if omitBundledHTTP2 {
473 return
474 }
475
476 t.configureHTTP2(protocols)
477 }
478
479 func (t *Transport) protocols() Protocols {
480 if t.Protocols != nil {
481 return *t.Protocols
482 }
483 var p Protocols
484 p.SetHTTP1(true)
485 switch {
486 case t.TLSNextProto != nil:
487
488
489 if t.TLSNextProto["h2"] != nil {
490 p.SetHTTP2(true)
491 }
492 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
493
494
495
496
497
498
499 case http2client.Value() == "0":
500 default:
501 p.SetHTTP2(true)
502 }
503 return p
504 }
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
523 return envProxyFunc()(req.URL)
524 }
525
526
527
528 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
529 return func(*Request) (*url.URL, error) {
530 return fixedURL, nil
531 }
532 }
533
534
535
536
537 type transportRequest struct {
538 *Request
539 extra Header
540 trace *httptrace.ClientTrace
541
542 ctx context.Context
543 cancel context.CancelCauseFunc
544
545 mu sync.Mutex
546 err error
547 }
548
549 func (tr *transportRequest) extraHeaders() Header {
550 if tr.extra == nil {
551 tr.extra = make(Header)
552 }
553 return tr.extra
554 }
555
556 func (tr *transportRequest) setError(err error) {
557 tr.mu.Lock()
558 if tr.err == nil {
559 tr.err = err
560 }
561 tr.mu.Unlock()
562 }
563
564
565
566 func (t *Transport) useRegisteredProtocol(req *Request) bool {
567 if req.URL.Scheme == "https" && req.requiresHTTP1() {
568
569
570
571
572 return false
573 }
574 return true
575 }
576
577
578
579
580 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
581 if !t.useRegisteredProtocol(req) {
582 return nil
583 }
584 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
585 return altProto[req.URL.Scheme]
586 }
587
588 func validateHeaders(hdrs Header) string {
589 for k, vv := range hdrs {
590 if !httpguts.ValidHeaderFieldName(k) {
591 return fmt.Sprintf("field name %q", k)
592 }
593 for _, v := range vv {
594 if !httpguts.ValidHeaderFieldValue(v) {
595
596
597 return fmt.Sprintf("field value for %q", k)
598 }
599 }
600 }
601 return ""
602 }
603
604
605 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
606 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
607 ctx := req.Context()
608 trace := httptrace.ContextClientTrace(ctx)
609
610 if req.URL == nil {
611 req.closeBody()
612 return nil, errors.New("http: nil Request.URL")
613 }
614 if req.Header == nil {
615 req.closeBody()
616 return nil, errors.New("http: nil Request.Header")
617 }
618 scheme := req.URL.Scheme
619 isHTTP := scheme == "http" || scheme == "https"
620 if isHTTP {
621
622 if err := validateHeaders(req.Header); err != "" {
623 req.closeBody()
624 return nil, fmt.Errorf("net/http: invalid header %s", err)
625 }
626
627
628 if err := validateHeaders(req.Trailer); err != "" {
629 req.closeBody()
630 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
631 }
632 }
633
634 origReq := req
635 req = setupRewindBody(req)
636
637 if altRT := t.alternateRoundTripper(req); altRT != nil {
638 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
639 return resp, err
640 }
641 var err error
642 req, err = rewindBody(req)
643 if err != nil {
644 return nil, err
645 }
646 }
647 if !isHTTP {
648 req.closeBody()
649 return nil, badStringError("unsupported protocol scheme", scheme)
650 }
651 if req.Method != "" && !validMethod(req.Method) {
652 req.closeBody()
653 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
654 }
655 if req.URL.Host == "" {
656 req.closeBody()
657 return nil, errors.New("http: no Host in request URL")
658 }
659
660
661
662
663
664
665
666
667
668
669 ctx, cancel := context.WithCancelCause(req.Context())
670
671
672 if origReq.Cancel != nil {
673 go awaitLegacyCancel(ctx, cancel, origReq)
674 }
675
676
677
678
679
680 cancel = t.prepareTransportCancel(origReq, cancel)
681
682 defer func() {
683 if err != nil {
684 cancel(err)
685 }
686 }()
687
688 for {
689 select {
690 case <-ctx.Done():
691 req.closeBody()
692 return nil, context.Cause(ctx)
693 default:
694 }
695
696
697 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
698 cm, err := t.connectMethodForRequest(treq)
699 if err != nil {
700 req.closeBody()
701 return nil, err
702 }
703
704
705
706
707
708 pconn, err := t.getConn(treq, cm)
709 if err != nil {
710 req.closeBody()
711 return nil, err
712 }
713
714 var resp *Response
715 if pconn.alt != nil {
716
717 resp, err = pconn.alt.RoundTrip(req)
718 } else {
719 resp, err = pconn.roundTrip(treq)
720 }
721 if err == nil {
722 if pconn.alt != nil {
723
724
725
726
727
728 cancel(errRequestDone)
729 }
730 resp.Request = origReq
731 return resp, nil
732 }
733
734
735 if http2isNoCachedConnError(err) {
736 if t.removeIdleConn(pconn) {
737 t.decConnsPerHost(pconn.cacheKey)
738 }
739 } else if !pconn.shouldRetryRequest(req, err) {
740
741
742 if e, ok := err.(nothingWrittenError); ok {
743 err = e.error
744 }
745 if e, ok := err.(transportReadFromServerError); ok {
746 err = e.err
747 }
748 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
749
750
751
752 req.closeBody()
753 }
754 return nil, err
755 }
756 testHookRoundTripRetried()
757
758
759 req, err = rewindBody(req)
760 if err != nil {
761 return nil, err
762 }
763 }
764 }
765
766 func http2isNoCachedConnError(err error) bool {
767 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
768 return ok
769 }
770
771 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
772 select {
773 case <-req.Cancel:
774 cancel(errRequestCanceled)
775 case <-ctx.Done():
776 }
777 }
778
779 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
780
781 type readTrackingBody struct {
782 io.ReadCloser
783 didRead bool
784 didClose atomic.Bool
785 }
786
787 func (r *readTrackingBody) Read(data []byte) (int, error) {
788 r.didRead = true
789 return r.ReadCloser.Read(data)
790 }
791
792 func (r *readTrackingBody) Close() error {
793 if !r.didClose.CompareAndSwap(false, true) {
794 return nil
795 }
796 return r.ReadCloser.Close()
797 }
798
799
800
801
802
803 func setupRewindBody(req *Request) *Request {
804 if req.Body == nil || req.Body == NoBody {
805 return req
806 }
807 newReq := *req
808 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
809 return &newReq
810 }
811
812
813
814
815
816 func rewindBody(req *Request) (rewound *Request, err error) {
817 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
818 return req, nil
819 }
820 if !req.Body.(*readTrackingBody).didClose.Load() {
821 req.closeBody()
822 }
823 if req.GetBody == nil {
824 return nil, errCannotRewind
825 }
826 body, err := req.GetBody()
827 if err != nil {
828 return nil, err
829 }
830 newReq := *req
831 newReq.Body = &readTrackingBody{ReadCloser: body}
832 return &newReq, nil
833 }
834
835
836
837
838 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
839 if http2isNoCachedConnError(err) {
840
841
842
843
844
845
846 return true
847 }
848 if err == errMissingHost {
849
850 return false
851 }
852 if !pc.isReused() {
853
854
855
856
857
858
859
860 return false
861 }
862 if _, ok := err.(nothingWrittenError); ok {
863
864
865 return req.outgoingLength() == 0 || req.GetBody != nil
866 }
867 if !req.isReplayable() {
868
869 return false
870 }
871 if _, ok := err.(transportReadFromServerError); ok {
872
873
874 return true
875 }
876 if err == errServerClosedIdle {
877
878
879
880 return true
881 }
882 return false
883 }
884
885
886 var ErrSkipAltProtocol = internal.ErrSkipAltProtocol
887
888
889
890
891
892
893
894
895
896
897
898 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
899 if err := t.registerProtocol(scheme, rt); err != nil {
900 panic(err)
901 }
902 }
903
904 func (t *Transport) registerProtocol(scheme string, rt RoundTripper) error {
905 t.altMu.Lock()
906 defer t.altMu.Unlock()
907
908 if scheme == "http/3" {
909 var ok bool
910 if t.h3transport, ok = rt.(dialClientConner); !ok {
911 panic("http: HTTP/3 RoundTripper does not implement DialClientConn")
912 }
913 }
914
915 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
916 if _, exists := oldMap[scheme]; exists {
917 return errors.New("protocol " + scheme + " already registered")
918 }
919 newMap := maps.Clone(oldMap)
920 if newMap == nil {
921 newMap = make(map[string]RoundTripper)
922 }
923 newMap[scheme] = rt
924 t.altProto.Store(newMap)
925 return nil
926 }
927
928
929
930
931
932 func (t *Transport) CloseIdleConnections() {
933 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
934 t.idleMu.Lock()
935 m := t.idleConn
936 t.idleConn = nil
937 t.closeIdle = true
938 t.idleLRU = connLRU{}
939 t.idleMu.Unlock()
940 for _, conns := range m {
941 for _, pconn := range conns {
942 pconn.close(errCloseIdleConns)
943 }
944 }
945 t.connsPerHostMu.Lock()
946 t.dialsInProgress.all(func(w *wantConn) {
947 if w.cancelCtx != nil && !w.waiting() {
948 w.cancelCtx()
949 }
950 })
951 t.connsPerHostMu.Unlock()
952 if t2 := t.h2transport; t2 != nil {
953 t2.CloseIdleConnections()
954 }
955 if cc, ok := t.h3transport.(closeIdleConnectionser); ok {
956 cc.CloseIdleConnections()
957 }
958 }
959
960
961 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
962
963
964
965
966
967
968 cancel := func(err error) {
969 origCancel(err)
970 t.reqMu.Lock()
971 delete(t.reqCanceler, req)
972 t.reqMu.Unlock()
973 }
974 t.reqMu.Lock()
975 if t.reqCanceler == nil {
976 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
977 }
978 t.reqCanceler[req] = cancel
979 t.reqMu.Unlock()
980 return cancel
981 }
982
983
984
985
986
987
988
989 func (t *Transport) CancelRequest(req *Request) {
990 t.reqMu.Lock()
991 cancel := t.reqCanceler[req]
992 t.reqMu.Unlock()
993 if cancel != nil {
994 cancel(errRequestCanceled)
995 }
996 }
997
998
999
1000
1001
1002 var (
1003 envProxyOnce sync.Once
1004 envProxyFuncValue func(*url.URL) (*url.URL, error)
1005 )
1006
1007
1008
1009 func envProxyFunc() func(*url.URL) (*url.URL, error) {
1010 envProxyOnce.Do(func() {
1011 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
1012 })
1013 return envProxyFuncValue
1014 }
1015
1016
1017 func resetProxyConfig() {
1018 envProxyOnce = sync.Once{}
1019 envProxyFuncValue = nil
1020 }
1021
1022 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
1023 cm.targetScheme = treq.URL.Scheme
1024 cm.targetAddr = canonicalAddr(treq.URL)
1025 if t.Proxy != nil {
1026 cm.proxyURL, err = t.Proxy(treq.Request)
1027 }
1028 cm.onlyH1 = treq.requiresHTTP1()
1029 return cm, err
1030 }
1031
1032
1033
1034 func (cm *connectMethod) proxyAuth() string {
1035 if cm.proxyURL == nil {
1036 return ""
1037 }
1038 if u := cm.proxyURL.User; u != nil {
1039 username := u.Username()
1040 password, _ := u.Password()
1041 return "Basic " + basicAuth(username, password)
1042 }
1043 return ""
1044 }
1045
1046
1047 var (
1048 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1049 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1050 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1051 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1052 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1053 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1054 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1055 errIdleConnTimeout = errors.New("http: idle connection timeout")
1056
1057
1058
1059
1060
1061 errServerClosedIdle = errors.New("http: server closed idle connection")
1062 )
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072 type transportReadFromServerError struct {
1073 err error
1074 }
1075
1076 func (e transportReadFromServerError) Unwrap() error { return e.err }
1077
1078 func (e transportReadFromServerError) Error() string {
1079 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1080 }
1081
1082 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1083 if err := t.tryPutIdleConn(pconn); err != nil {
1084 pconn.close(err)
1085 }
1086 }
1087
1088 func (t *Transport) maxIdleConnsPerHost() int {
1089 if v := t.MaxIdleConnsPerHost; v != 0 {
1090 return v
1091 }
1092 return DefaultMaxIdleConnsPerHost
1093 }
1094
1095
1096
1097
1098
1099
1100 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1101 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1102 return errKeepAlivesDisabled
1103 }
1104 if pconn.isBroken() {
1105 return errConnBroken
1106 }
1107 pconn.markReused()
1108 if pconn.isClientConn {
1109
1110 defer pconn.internalStateHook()
1111 pconn.mu.Lock()
1112 defer pconn.mu.Unlock()
1113 if !pconn.inFlight {
1114 panic("pconn is not in flight")
1115 }
1116 pconn.inFlight = false
1117 select {
1118 case pconn.availch <- struct{}{}:
1119 default:
1120 panic("unable to make pconn available")
1121 }
1122 return nil
1123 }
1124
1125 t.idleMu.Lock()
1126 defer t.idleMu.Unlock()
1127
1128
1129
1130
1131 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1132 return nil
1133 }
1134
1135
1136
1137
1138
1139 key := pconn.cacheKey
1140 if q, ok := t.idleConnWait[key]; ok {
1141 done := false
1142 if pconn.alt == nil {
1143
1144
1145 for q.len() > 0 {
1146 w := q.popFront()
1147 if w.tryDeliver(pconn, nil, time.Time{}) {
1148 done = true
1149 break
1150 }
1151 }
1152 } else {
1153
1154
1155
1156
1157 for q.len() > 0 {
1158 w := q.popFront()
1159 w.tryDeliver(pconn, nil, time.Time{})
1160 }
1161 }
1162 if q.len() == 0 {
1163 delete(t.idleConnWait, key)
1164 } else {
1165 t.idleConnWait[key] = q
1166 }
1167 if done {
1168 return nil
1169 }
1170 }
1171
1172 if t.closeIdle {
1173 return errCloseIdle
1174 }
1175 if t.idleConn == nil {
1176 t.idleConn = make(map[connectMethodKey][]*persistConn)
1177 }
1178 idles := t.idleConn[key]
1179 if len(idles) >= t.maxIdleConnsPerHost() {
1180 return errTooManyIdleHost
1181 }
1182 for _, exist := range idles {
1183 if exist == pconn {
1184 log.Fatalf("dup idle pconn %p in freelist", pconn)
1185 }
1186 }
1187 t.idleConn[key] = append(idles, pconn)
1188 t.idleLRU.add(pconn)
1189 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1190 oldest := t.idleLRU.removeOldest()
1191 oldest.close(errTooManyIdle)
1192 t.removeIdleConnLocked(oldest)
1193 }
1194
1195
1196
1197
1198 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1199 if pconn.idleTimer != nil {
1200 pconn.idleTimer.Reset(t.IdleConnTimeout)
1201 } else {
1202 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1203 }
1204 }
1205 pconn.idleAt = time.Now()
1206 return nil
1207 }
1208
1209
1210
1211
1212 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1213 if t.DisableKeepAlives {
1214 return false
1215 }
1216
1217 t.idleMu.Lock()
1218 defer t.idleMu.Unlock()
1219
1220
1221
1222 t.closeIdle = false
1223
1224 if w == nil {
1225
1226 return false
1227 }
1228
1229
1230
1231
1232 var oldTime time.Time
1233 if t.IdleConnTimeout > 0 {
1234 oldTime = time.Now().Add(-t.IdleConnTimeout)
1235 }
1236
1237
1238 if list, ok := t.idleConn[w.key]; ok {
1239 stop := false
1240 delivered := false
1241 for len(list) > 0 && !stop {
1242 pconn := list[len(list)-1]
1243
1244
1245
1246
1247 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1248 if tooOld {
1249
1250
1251
1252 go pconn.closeConnIfStillIdle()
1253 }
1254 if pconn.isBroken() || tooOld {
1255
1256
1257
1258
1259
1260 list = list[:len(list)-1]
1261 continue
1262 }
1263 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1264 if delivered {
1265 if pconn.alt != nil {
1266
1267
1268 } else {
1269
1270
1271 t.idleLRU.remove(pconn)
1272 list = list[:len(list)-1]
1273 }
1274 }
1275 stop = true
1276 }
1277 if len(list) > 0 {
1278 t.idleConn[w.key] = list
1279 } else {
1280 delete(t.idleConn, w.key)
1281 }
1282 if stop {
1283 return delivered
1284 }
1285 }
1286
1287
1288 if t.idleConnWait == nil {
1289 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1290 }
1291 q := t.idleConnWait[w.key]
1292 q.cleanFrontNotWaiting()
1293 q.pushBack(w)
1294 t.idleConnWait[w.key] = q
1295 return false
1296 }
1297
1298
1299 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1300 if pconn.isClientConn {
1301 return true
1302 }
1303 t.idleMu.Lock()
1304 defer t.idleMu.Unlock()
1305 return t.removeIdleConnLocked(pconn)
1306 }
1307
1308
1309 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1310 if pconn.idleTimer != nil {
1311 pconn.idleTimer.Stop()
1312 }
1313 t.idleLRU.remove(pconn)
1314 key := pconn.cacheKey
1315 pconns := t.idleConn[key]
1316 var removed bool
1317 switch len(pconns) {
1318 case 0:
1319
1320 case 1:
1321 if pconns[0] == pconn {
1322 delete(t.idleConn, key)
1323 removed = true
1324 }
1325 default:
1326 for i, v := range pconns {
1327 if v != pconn {
1328 continue
1329 }
1330
1331
1332 copy(pconns[i:], pconns[i+1:])
1333 t.idleConn[key] = pconns[:len(pconns)-1]
1334 removed = true
1335 break
1336 }
1337 }
1338 return removed
1339 }
1340
1341 var zeroDialer net.Dialer
1342
1343 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1344 if t.DialContext != nil {
1345 c, err := t.DialContext(ctx, network, addr)
1346 if c == nil && err == nil {
1347 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1348 }
1349 return c, err
1350 }
1351 if t.Dial != nil {
1352 c, err := t.Dial(network, addr)
1353 if c == nil && err == nil {
1354 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1355 }
1356 return c, err
1357 }
1358 return zeroDialer.DialContext(ctx, network, addr)
1359 }
1360
1361
1362
1363
1364
1365
1366
1367 type wantConn struct {
1368 cm connectMethod
1369 key connectMethodKey
1370
1371
1372
1373
1374 beforeDial func()
1375 afterDial func()
1376
1377 mu sync.Mutex
1378 ctx context.Context
1379 cancelCtx context.CancelFunc
1380 done bool
1381 result chan connOrError
1382 }
1383
1384 type connOrError struct {
1385 pc *persistConn
1386 err error
1387 idleAt time.Time
1388 }
1389
1390
1391 func (w *wantConn) waiting() bool {
1392 w.mu.Lock()
1393 defer w.mu.Unlock()
1394
1395 return !w.done
1396 }
1397
1398
1399 func (w *wantConn) getCtxForDial() context.Context {
1400 w.mu.Lock()
1401 defer w.mu.Unlock()
1402
1403 return w.ctx
1404 }
1405
1406
1407 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1408 w.mu.Lock()
1409 defer w.mu.Unlock()
1410
1411 if w.done {
1412 return false
1413 }
1414 if (pc == nil) == (err == nil) {
1415 panic("net/http: internal error: misuse of tryDeliver")
1416 }
1417 w.ctx = nil
1418 w.done = true
1419
1420 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1421 close(w.result)
1422
1423 return true
1424 }
1425
1426
1427
1428 func (w *wantConn) cancel(t *Transport) {
1429 w.mu.Lock()
1430 var pc *persistConn
1431 if w.done {
1432 if r, ok := <-w.result; ok {
1433 pc = r.pc
1434 }
1435 } else {
1436 close(w.result)
1437 }
1438 w.ctx = nil
1439 w.done = true
1440 w.mu.Unlock()
1441
1442
1443
1444
1445 if pc != nil && pc.alt == nil {
1446 t.putOrCloseIdleConn(pc)
1447 }
1448 }
1449
1450
1451 type wantConnQueue struct {
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462 head []*wantConn
1463 headPos int
1464 tail []*wantConn
1465 }
1466
1467
1468 func (q *wantConnQueue) len() int {
1469 return len(q.head) - q.headPos + len(q.tail)
1470 }
1471
1472
1473 func (q *wantConnQueue) pushBack(w *wantConn) {
1474 q.tail = append(q.tail, w)
1475 }
1476
1477
1478 func (q *wantConnQueue) popFront() *wantConn {
1479 if q.headPos >= len(q.head) {
1480 if len(q.tail) == 0 {
1481 return nil
1482 }
1483
1484 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1485 }
1486 w := q.head[q.headPos]
1487 q.head[q.headPos] = nil
1488 q.headPos++
1489 return w
1490 }
1491
1492
1493 func (q *wantConnQueue) peekFront() *wantConn {
1494 if q.headPos < len(q.head) {
1495 return q.head[q.headPos]
1496 }
1497 if len(q.tail) > 0 {
1498 return q.tail[0]
1499 }
1500 return nil
1501 }
1502
1503
1504
1505 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1506 for {
1507 w := q.peekFront()
1508 if w == nil || w.waiting() {
1509 return cleaned
1510 }
1511 q.popFront()
1512 cleaned = true
1513 }
1514 }
1515
1516
1517 func (q *wantConnQueue) cleanFrontCanceled() {
1518 for {
1519 w := q.peekFront()
1520 if w == nil || w.cancelCtx != nil {
1521 return
1522 }
1523 q.popFront()
1524 }
1525 }
1526
1527
1528
1529 func (q *wantConnQueue) all(f func(*wantConn)) {
1530 for _, w := range q.head[q.headPos:] {
1531 f(w)
1532 }
1533 for _, w := range q.tail {
1534 f(w)
1535 }
1536 }
1537
1538 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1539 if t.DialTLSContext != nil {
1540 conn, err = t.DialTLSContext(ctx, network, addr)
1541 } else {
1542 conn, err = t.DialTLS(network, addr)
1543 }
1544 if conn == nil && err == nil {
1545 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1546 }
1547 return
1548 }
1549
1550
1551
1552
1553
1554 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1555 req := treq.Request
1556 trace := treq.trace
1557 ctx := req.Context()
1558 if trace != nil && trace.GetConn != nil {
1559 trace.GetConn(cm.addr())
1560 }
1561
1562
1563
1564
1565
1566
1567 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1568
1569 w := &wantConn{
1570 cm: cm,
1571 key: cm.key(),
1572 ctx: dialCtx,
1573 cancelCtx: dialCancel,
1574 result: make(chan connOrError, 1),
1575 beforeDial: testHookPrePendingDial,
1576 afterDial: testHookPostPendingDial,
1577 }
1578 defer func() {
1579 if err != nil {
1580 w.cancel(t)
1581 }
1582 }()
1583
1584
1585 if delivered := t.queueForIdleConn(w); !delivered {
1586 t.queueForDial(w)
1587 }
1588
1589
1590 select {
1591 case r := <-w.result:
1592
1593
1594 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1595 info := httptrace.GotConnInfo{
1596 Conn: r.pc.conn,
1597 Reused: r.pc.isReused(),
1598 }
1599 if !r.idleAt.IsZero() {
1600 info.WasIdle = true
1601 info.IdleTime = time.Since(r.idleAt)
1602 }
1603 trace.GotConn(info)
1604 }
1605 if r.err != nil {
1606
1607
1608
1609 select {
1610 case <-treq.ctx.Done():
1611 err := context.Cause(treq.ctx)
1612 if err == errRequestCanceled {
1613 err = errRequestCanceledConn
1614 }
1615 return nil, err
1616 default:
1617
1618 }
1619 }
1620 return r.pc, r.err
1621 case <-treq.ctx.Done():
1622 err := context.Cause(treq.ctx)
1623 if err == errRequestCanceled {
1624 err = errRequestCanceledConn
1625 }
1626 return nil, err
1627 }
1628 }
1629
1630
1631
1632 func (t *Transport) queueForDial(w *wantConn) {
1633 w.beforeDial()
1634
1635 t.connsPerHostMu.Lock()
1636 defer t.connsPerHostMu.Unlock()
1637
1638 if t.MaxConnsPerHost <= 0 {
1639 t.startDialConnForLocked(w)
1640 return
1641 }
1642
1643 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1644 if t.connsPerHost == nil {
1645 t.connsPerHost = make(map[connectMethodKey]int)
1646 }
1647 t.connsPerHost[w.key] = n + 1
1648 t.startDialConnForLocked(w)
1649 return
1650 }
1651
1652 if t.connsPerHostWait == nil {
1653 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1654 }
1655 q := t.connsPerHostWait[w.key]
1656 q.cleanFrontNotWaiting()
1657 q.pushBack(w)
1658 t.connsPerHostWait[w.key] = q
1659 }
1660
1661
1662
1663 func (t *Transport) startDialConnForLocked(w *wantConn) {
1664 t.dialsInProgress.cleanFrontCanceled()
1665 t.dialsInProgress.pushBack(w)
1666 go func() {
1667 t.dialConnFor(w)
1668 t.connsPerHostMu.Lock()
1669 defer t.connsPerHostMu.Unlock()
1670 w.cancelCtx = nil
1671 }()
1672 }
1673
1674
1675
1676
1677 func (t *Transport) dialConnFor(w *wantConn) {
1678 defer w.afterDial()
1679 ctx := w.getCtxForDial()
1680 if ctx == nil {
1681 t.decConnsPerHost(w.key)
1682 return
1683 }
1684
1685 const isClientConn = false
1686 pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
1687 delivered := w.tryDeliver(pc, err, time.Time{})
1688 if err == nil && (!delivered || pc.alt != nil) {
1689
1690
1691
1692 t.putOrCloseIdleConn(pc)
1693 }
1694 if err != nil {
1695 t.decConnsPerHost(w.key)
1696 }
1697 }
1698
1699
1700
1701 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1702 if t.MaxConnsPerHost <= 0 {
1703 return
1704 }
1705
1706 t.connsPerHostMu.Lock()
1707 defer t.connsPerHostMu.Unlock()
1708 n := t.connsPerHost[key]
1709 if n == 0 {
1710
1711
1712 panic("net/http: internal error: connCount underflow")
1713 }
1714
1715
1716
1717
1718
1719 if q := t.connsPerHostWait[key]; q.len() > 0 {
1720 done := false
1721 for q.len() > 0 {
1722 w := q.popFront()
1723 if w.waiting() {
1724 t.startDialConnForLocked(w)
1725 done = true
1726 break
1727 }
1728 }
1729 if q.len() == 0 {
1730 delete(t.connsPerHostWait, key)
1731 } else {
1732
1733
1734 t.connsPerHostWait[key] = q
1735 }
1736 if done {
1737 return
1738 }
1739 }
1740
1741
1742 if n--; n == 0 {
1743 delete(t.connsPerHost, key)
1744 } else {
1745 t.connsPerHost[key] = n
1746 }
1747 }
1748
1749
1750
1751
1752 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1753
1754 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1755 if cfg.ServerName == "" {
1756 cfg.ServerName = name
1757 }
1758 if pconn.cacheKey.onlyH1 {
1759 cfg.NextProtos = nil
1760 }
1761 plainConn := pconn.conn
1762 tlsConn := tls.Client(plainConn, cfg)
1763 errc := make(chan error, 2)
1764 var timer *time.Timer
1765 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1766 timer = time.AfterFunc(d, func() {
1767 errc <- tlsHandshakeTimeoutError{}
1768 })
1769 }
1770 go func() {
1771 if trace != nil && trace.TLSHandshakeStart != nil {
1772 trace.TLSHandshakeStart()
1773 }
1774 err := tlsConn.HandshakeContext(ctx)
1775 if timer != nil {
1776 timer.Stop()
1777 }
1778 errc <- err
1779 }()
1780 if err := <-errc; err != nil {
1781 plainConn.Close()
1782 if err == (tlsHandshakeTimeoutError{}) {
1783
1784
1785 <-errc
1786 }
1787 if trace != nil && trace.TLSHandshakeDone != nil {
1788 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1789 }
1790 return err
1791 }
1792 cs := tlsConn.ConnectionState()
1793 if trace != nil && trace.TLSHandshakeDone != nil {
1794 trace.TLSHandshakeDone(cs, nil)
1795 }
1796 pconn.tlsState = &cs
1797 pconn.conn = tlsConn
1798 return nil
1799 }
1800
1801 type erringRoundTripper interface {
1802 RoundTripErr() error
1803 }
1804
1805 var testHookProxyConnectTimeout = context.WithTimeout
1806
1807 func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
1808
1809
1810
1811
1812 if p := t.protocols(); p.http3() {
1813 if p.HTTP1() || p.HTTP2() || p.UnencryptedHTTP2() {
1814 return nil, errors.New("http: when using HTTP3, Transport.Protocols must contain only HTTP3")
1815 }
1816 if t.h3transport == nil {
1817 return nil, errors.New("http: Transport.Protocols contains HTTP3, but Transport does not support HTTP/3")
1818 }
1819 rt, err := t.h3transport.DialClientConn(ctx, cm.addr(), cm.proxyURL, internalStateHook)
1820 if err != nil {
1821 return nil, err
1822 }
1823 return &persistConn{
1824 t: t,
1825 cacheKey: cm.key(),
1826 alt: rt,
1827 }, nil
1828 }
1829
1830 pconn = &persistConn{
1831 t: t,
1832 cacheKey: cm.key(),
1833 reqch: make(chan requestAndChan, 1),
1834 writech: make(chan writeRequest, 1),
1835 closech: make(chan struct{}),
1836 writeErrCh: make(chan error, 1),
1837 writeLoopDone: make(chan struct{}),
1838 isClientConn: isClientConn,
1839 internalStateHook: internalStateHook,
1840 }
1841 trace := httptrace.ContextClientTrace(ctx)
1842 wrapErr := func(err error) error {
1843 if cm.proxyURL != nil {
1844
1845 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1846 }
1847 return err
1848 }
1849 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1850 var err error
1851 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1852 if err != nil {
1853 return nil, wrapErr(err)
1854 }
1855 if tc, ok := pconn.conn.(*tls.Conn); ok {
1856
1857
1858 if trace != nil && trace.TLSHandshakeStart != nil {
1859 trace.TLSHandshakeStart()
1860 }
1861 if err := tc.HandshakeContext(ctx); err != nil {
1862 go pconn.conn.Close()
1863 if trace != nil && trace.TLSHandshakeDone != nil {
1864 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1865 }
1866 return nil, err
1867 }
1868 cs := tc.ConnectionState()
1869 if trace != nil && trace.TLSHandshakeDone != nil {
1870 trace.TLSHandshakeDone(cs, nil)
1871 }
1872 pconn.tlsState = &cs
1873 }
1874 } else {
1875 conn, err := t.dial(ctx, "tcp", cm.addr())
1876 if err != nil {
1877 return nil, wrapErr(err)
1878 }
1879 pconn.conn = conn
1880 if cm.scheme() == "https" {
1881 var firstTLSHost string
1882 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1883 return nil, wrapErr(err)
1884 }
1885 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1886 return nil, wrapErr(err)
1887 }
1888 }
1889 }
1890
1891
1892 switch {
1893 case cm.proxyURL == nil:
1894
1895 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1896 conn := pconn.conn
1897 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1898 if u := cm.proxyURL.User; u != nil {
1899 auth := &socksUsernamePassword{
1900 Username: u.Username(),
1901 }
1902 auth.Password, _ = u.Password()
1903 d.AuthMethods = []socksAuthMethod{
1904 socksAuthMethodNotRequired,
1905 socksAuthMethodUsernamePassword,
1906 }
1907 d.Authenticate = auth.Authenticate
1908 }
1909 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1910 conn.Close()
1911 return nil, err
1912 }
1913 case cm.targetScheme == "http":
1914 pconn.isProxy = true
1915 if pa := cm.proxyAuth(); pa != "" {
1916 pconn.mutateHeaderFunc = func(h Header) {
1917 h.Set("Proxy-Authorization", pa)
1918 }
1919 }
1920 case cm.targetScheme == "https":
1921 conn := pconn.conn
1922 var hdr Header
1923 if t.GetProxyConnectHeader != nil {
1924 var err error
1925 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1926 if err != nil {
1927 conn.Close()
1928 return nil, err
1929 }
1930 } else {
1931 hdr = t.ProxyConnectHeader
1932 }
1933 if hdr == nil {
1934 hdr = make(Header)
1935 }
1936 if pa := cm.proxyAuth(); pa != "" {
1937 hdr = hdr.Clone()
1938 hdr.Set("Proxy-Authorization", pa)
1939 }
1940 connectReq := &Request{
1941 Method: "CONNECT",
1942 URL: &url.URL{Opaque: cm.targetAddr},
1943 Host: cm.targetAddr,
1944 Header: hdr,
1945 }
1946
1947
1948
1949
1950 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1951 defer cancel()
1952
1953 didReadResponse := make(chan struct{})
1954 var (
1955 resp *Response
1956 err error
1957 )
1958
1959 go func() {
1960 defer close(didReadResponse)
1961 err = connectReq.Write(conn)
1962 if err != nil {
1963 return
1964 }
1965
1966
1967 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1968 resp, err = ReadResponse(br, connectReq)
1969 }()
1970 select {
1971 case <-connectCtx.Done():
1972 conn.Close()
1973 <-didReadResponse
1974 return nil, connectCtx.Err()
1975 case <-didReadResponse:
1976
1977 }
1978 if err != nil {
1979 conn.Close()
1980 return nil, err
1981 }
1982
1983 if t.OnProxyConnectResponse != nil {
1984 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1985 if err != nil {
1986 conn.Close()
1987 return nil, err
1988 }
1989 }
1990
1991 if resp.StatusCode != 200 {
1992 _, text, ok := strings.Cut(resp.Status, " ")
1993 conn.Close()
1994 if !ok {
1995 return nil, errors.New("unknown status code")
1996 }
1997 return nil, errors.New(text)
1998 }
1999 }
2000
2001 if cm.proxyURL != nil && cm.targetScheme == "https" {
2002 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
2003 return nil, err
2004 }
2005 }
2006
2007
2008 unencryptedHTTP2 := pconn.tlsState == nil &&
2009 t.Protocols != nil &&
2010 t.Protocols.UnencryptedHTTP2() &&
2011 !t.Protocols.HTTP1()
2012
2013 if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
2014 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
2015 h2, ok := altProto["https"].(newClientConner)
2016 if !ok {
2017 return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
2018 }
2019 alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
2020 if err != nil {
2021 pconn.conn.Close()
2022 return nil, err
2023 }
2024 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
2025 }
2026
2027 if unencryptedHTTP2 {
2028 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
2029 if !ok {
2030 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
2031 }
2032 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
2033 if e, ok := alt.(erringRoundTripper); ok {
2034
2035 return nil, e.RoundTripErr()
2036 }
2037 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2038 }
2039
2040 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
2041 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
2042 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
2043 if e, ok := alt.(erringRoundTripper); ok {
2044
2045 return nil, e.RoundTripErr()
2046 }
2047 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2048 }
2049 }
2050
2051 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
2052 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
2053
2054 go pconn.readLoop()
2055 go pconn.writeLoop()
2056 return pconn, nil
2057 }
2058
2059
2060
2061
2062
2063
2064
2065 type persistConnWriter struct {
2066 pc *persistConn
2067 }
2068
2069 func (w persistConnWriter) Write(p []byte) (n int, err error) {
2070 n, err = w.pc.conn.Write(p)
2071 w.pc.nwrite += int64(n)
2072 return
2073 }
2074
2075
2076
2077
2078 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
2079 n, err = io.Copy(w.pc.conn, r)
2080 w.pc.nwrite += n
2081 return
2082 }
2083
2084 var _ io.ReaderFrom = (*persistConnWriter)(nil)
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102 type connectMethod struct {
2103 _ incomparable
2104 proxyURL *url.URL
2105 targetScheme string
2106
2107
2108
2109 targetAddr string
2110 onlyH1 bool
2111 }
2112
2113 func (cm *connectMethod) key() connectMethodKey {
2114 proxyStr := ""
2115 targetAddr := cm.targetAddr
2116 if cm.proxyURL != nil {
2117 proxyStr = cm.proxyURL.String()
2118 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2119 targetAddr = ""
2120 }
2121 }
2122 return connectMethodKey{
2123 proxy: proxyStr,
2124 scheme: cm.targetScheme,
2125 addr: targetAddr,
2126 onlyH1: cm.onlyH1,
2127 }
2128 }
2129
2130
2131 func (cm *connectMethod) scheme() string {
2132 if cm.proxyURL != nil {
2133 return cm.proxyURL.Scheme
2134 }
2135 return cm.targetScheme
2136 }
2137
2138
2139 func (cm *connectMethod) addr() string {
2140 if cm.proxyURL != nil {
2141 return canonicalAddr(cm.proxyURL)
2142 }
2143 return cm.targetAddr
2144 }
2145
2146
2147
2148 func (cm *connectMethod) tlsHost() string {
2149 h := cm.targetAddr
2150 return removePort(h)
2151 }
2152
2153
2154
2155
2156 type connectMethodKey struct {
2157 proxy, scheme, addr string
2158 onlyH1 bool
2159 }
2160
2161 func (k connectMethodKey) String() string {
2162
2163 var h1 string
2164 if k.onlyH1 {
2165 h1 = ",h1"
2166 }
2167 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2168 }
2169
2170
2171
2172 type persistConn struct {
2173
2174
2175
2176 alt RoundTripper
2177
2178 t *Transport
2179 cacheKey connectMethodKey
2180 conn net.Conn
2181 tlsState *tls.ConnectionState
2182 br *bufio.Reader
2183 bw *bufio.Writer
2184 nwrite int64
2185 reqch chan requestAndChan
2186 writech chan writeRequest
2187 closech chan struct{}
2188 availch chan struct{}
2189 isProxy bool
2190 sawEOF bool
2191 isClientConn bool
2192 readLimit int64
2193
2194
2195
2196
2197 writeErrCh chan error
2198
2199 writeLoopDone chan struct{}
2200
2201
2202 idleAt time.Time
2203 idleTimer *time.Timer
2204
2205 mu sync.Mutex
2206 numExpectedResponses int
2207 closed error
2208 canceledErr error
2209 reused bool
2210 reserved bool
2211 inFlight bool
2212 internalStateHook func()
2213
2214
2215
2216
2217 mutateHeaderFunc func(Header)
2218 }
2219
2220 func (pc *persistConn) maxHeaderResponseSize() int64 {
2221 return pc.t.maxHeaderResponseSize()
2222 }
2223
2224 func (pc *persistConn) Read(p []byte) (n int, err error) {
2225 if pc.readLimit <= 0 {
2226 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2227 }
2228 if int64(len(p)) > pc.readLimit {
2229 p = p[:pc.readLimit]
2230 }
2231 n, err = pc.conn.Read(p)
2232 if err == io.EOF {
2233 pc.sawEOF = true
2234 }
2235 pc.readLimit -= int64(n)
2236 return
2237 }
2238
2239
2240 func (pc *persistConn) isBroken() bool {
2241 pc.mu.Lock()
2242 b := pc.closed != nil
2243 pc.mu.Unlock()
2244 return b
2245 }
2246
2247
2248
2249 func (pc *persistConn) canceled() error {
2250 pc.mu.Lock()
2251 defer pc.mu.Unlock()
2252 return pc.canceledErr
2253 }
2254
2255
2256 func (pc *persistConn) isReused() bool {
2257 pc.mu.Lock()
2258 r := pc.reused
2259 pc.mu.Unlock()
2260 return r
2261 }
2262
2263 func (pc *persistConn) cancelRequest(err error) {
2264 pc.mu.Lock()
2265 defer pc.mu.Unlock()
2266 pc.canceledErr = err
2267 pc.closeLocked(errRequestCanceled)
2268 }
2269
2270
2271
2272
2273 func (pc *persistConn) closeConnIfStillIdle() {
2274 t := pc.t
2275 t.idleMu.Lock()
2276 defer t.idleMu.Unlock()
2277 if _, ok := t.idleLRU.m[pc]; !ok {
2278
2279 return
2280 }
2281 t.removeIdleConnLocked(pc)
2282 pc.close(errIdleConnTimeout)
2283 }
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2294 if err == nil {
2295 return nil
2296 }
2297
2298
2299
2300
2301
2302
2303
2304
2305 <-pc.writeLoopDone
2306
2307
2308
2309
2310 if cerr := pc.canceled(); cerr != nil {
2311 return cerr
2312 }
2313
2314
2315 req.mu.Lock()
2316 reqErr := req.err
2317 req.mu.Unlock()
2318 if reqErr != nil {
2319 return reqErr
2320 }
2321
2322 if err == errServerClosedIdle {
2323
2324 return err
2325 }
2326
2327 if _, ok := err.(transportReadFromServerError); ok {
2328 if pc.nwrite == startBytesWritten {
2329 return nothingWrittenError{err}
2330 }
2331
2332 return err
2333 }
2334 if pc.isBroken() {
2335 if pc.nwrite == startBytesWritten {
2336 return nothingWrittenError{err}
2337 }
2338 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2339 }
2340 return err
2341 }
2342
2343
2344
2345
2346 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2347
2348
2349
2350
2351 const maxPostCloseReadBytes = 256 << 10
2352
2353
2354
2355
2356 const maxPostCloseReadTime = 50 * time.Millisecond
2357
2358 func maybeDrainBody(body io.Reader) bool {
2359 drainedCh := make(chan bool, 1)
2360 go func() {
2361 if _, err := io.CopyN(io.Discard, body, maxPostCloseReadBytes+1); err == io.EOF {
2362 drainedCh <- true
2363 } else {
2364 drainedCh <- false
2365 }
2366 }()
2367 select {
2368 case drained := <-drainedCh:
2369 return drained
2370 case <-time.After(maxPostCloseReadTime):
2371 return false
2372 }
2373 }
2374
2375 func (pc *persistConn) readLoop() {
2376 closeErr := errReadLoopExiting
2377 defer func() {
2378 pc.close(closeErr)
2379 pc.t.removeIdleConn(pc)
2380 if pc.internalStateHook != nil {
2381 pc.internalStateHook()
2382 }
2383 }()
2384
2385 tryPutIdleConn := func(treq *transportRequest) bool {
2386 trace := treq.trace
2387 if err := pc.t.tryPutIdleConn(pc); err != nil {
2388 closeErr = err
2389 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2390 trace.PutIdleConn(err)
2391 }
2392 return false
2393 }
2394 if trace != nil && trace.PutIdleConn != nil {
2395 trace.PutIdleConn(nil)
2396 }
2397 return true
2398 }
2399
2400
2401
2402
2403 eofc := make(chan struct{})
2404 defer close(eofc)
2405
2406
2407 testHookMu.Lock()
2408 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2409 testHookMu.Unlock()
2410
2411 alive := true
2412 for alive {
2413 pc.readLimit = pc.maxHeaderResponseSize()
2414 _, err := pc.br.Peek(1)
2415
2416 pc.mu.Lock()
2417 if pc.numExpectedResponses == 0 {
2418 pc.readLoopPeekFailLocked(err)
2419 pc.mu.Unlock()
2420 return
2421 }
2422 pc.mu.Unlock()
2423
2424 rc := <-pc.reqch
2425 trace := rc.treq.trace
2426
2427 var resp *Response
2428 if err == nil {
2429 resp, err = pc.readResponse(rc, trace)
2430 } else {
2431 err = transportReadFromServerError{err}
2432 closeErr = err
2433 }
2434
2435 if err != nil {
2436 if pc.readLimit <= 0 {
2437 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2438 }
2439
2440 select {
2441 case rc.ch <- responseAndError{err: err}:
2442 case <-rc.callerGone:
2443 return
2444 }
2445 return
2446 }
2447 pc.readLimit = maxInt64
2448
2449 pc.mu.Lock()
2450 pc.numExpectedResponses--
2451 pc.mu.Unlock()
2452
2453 bodyWritable := resp.bodyIsWritable()
2454 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2455
2456 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2457
2458
2459
2460 alive = false
2461 }
2462
2463 if !hasBody || bodyWritable {
2464
2465
2466
2467
2468
2469 alive = alive &&
2470 !pc.sawEOF &&
2471 pc.wroteRequest() &&
2472 tryPutIdleConn(rc.treq)
2473
2474 if bodyWritable {
2475 closeErr = errCallerOwnsConn
2476 }
2477
2478 select {
2479 case rc.ch <- responseAndError{res: resp}:
2480 case <-rc.callerGone:
2481 return
2482 }
2483
2484 rc.treq.cancel(errRequestDone)
2485
2486
2487
2488
2489 testHookReadLoopBeforeNextRead()
2490 continue
2491 }
2492
2493 waitForBodyRead := make(chan bool, 2)
2494 body := &bodyEOFSignal{
2495 body: resp.Body,
2496 earlyCloseFn: func() error {
2497 waitForBodyRead <- false
2498 <-eofc
2499 return nil
2500
2501 },
2502 fn: func(err error) error {
2503 isEOF := err == io.EOF
2504 waitForBodyRead <- isEOF
2505 if isEOF {
2506 <-eofc
2507 } else if err != nil {
2508 if cerr := pc.canceled(); cerr != nil {
2509 return cerr
2510 }
2511 }
2512 return err
2513 },
2514 }
2515
2516 resp.Body = body
2517 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2518 resp.Body = &gzipReader{body: body}
2519 resp.Header.Del("Content-Encoding")
2520 resp.Header.Del("Content-Length")
2521 resp.ContentLength = -1
2522 resp.Uncompressed = true
2523 }
2524
2525 select {
2526 case rc.ch <- responseAndError{res: resp}:
2527 case <-rc.callerGone:
2528 return
2529 }
2530
2531
2532
2533
2534 select {
2535 case bodyEOF := <-waitForBodyRead:
2536 tryDrain := !bodyEOF && resp.ContentLength <= maxPostCloseReadBytes
2537 if tryDrain {
2538 eofc <- struct{}{}
2539 bodyEOF = maybeDrainBody(body.body)
2540 }
2541 alive = alive &&
2542 bodyEOF &&
2543 !pc.sawEOF &&
2544 pc.wroteRequest() &&
2545 tryPutIdleConn(rc.treq)
2546 if !tryDrain && bodyEOF {
2547 eofc <- struct{}{}
2548 }
2549 case <-rc.treq.ctx.Done():
2550 alive = false
2551 pc.cancelRequest(context.Cause(rc.treq.ctx))
2552 case <-pc.closech:
2553 alive = false
2554 }
2555
2556 rc.treq.cancel(errRequestDone)
2557 testHookReadLoopBeforeNextRead()
2558 }
2559 }
2560
2561 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2562 if pc.closed != nil {
2563 return
2564 }
2565 if n := pc.br.Buffered(); n > 0 {
2566 buf, _ := pc.br.Peek(n)
2567 if is408Message(buf) {
2568 pc.closeLocked(errServerClosedIdle)
2569 return
2570 } else {
2571 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2572 }
2573 }
2574 if peekErr == io.EOF {
2575
2576 pc.closeLocked(errServerClosedIdle)
2577 } else {
2578 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2579 }
2580 }
2581
2582
2583
2584
2585 func is408Message(buf []byte) bool {
2586 if len(buf) < len("HTTP/1.x 408") {
2587 return false
2588 }
2589 if string(buf[:7]) != "HTTP/1." {
2590 return false
2591 }
2592 return string(buf[8:12]) == " 408"
2593 }
2594
2595
2596
2597
2598 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2599 if trace != nil && trace.GotFirstResponseByte != nil {
2600 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2601 trace.GotFirstResponseByte()
2602 }
2603 }
2604
2605 continueCh := rc.continueCh
2606 for {
2607 resp, err = ReadResponse(pc.br, rc.treq.Request)
2608 if err != nil {
2609 return
2610 }
2611 resCode := resp.StatusCode
2612 if continueCh != nil && resCode == StatusContinue {
2613 if trace != nil && trace.Got100Continue != nil {
2614 trace.Got100Continue()
2615 }
2616 continueCh <- struct{}{}
2617 continueCh = nil
2618 }
2619 is1xx := 100 <= resCode && resCode <= 199
2620
2621 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2622 if is1xxNonTerminal {
2623 if trace != nil && trace.Got1xxResponse != nil {
2624 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2625 return nil, err
2626 }
2627
2628
2629
2630
2631
2632
2633
2634 pc.readLimit = pc.maxHeaderResponseSize()
2635 }
2636 continue
2637 }
2638 break
2639 }
2640 if resp.isProtocolSwitch() {
2641 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2642 }
2643 if continueCh != nil {
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656 if resp.Close || rc.treq.Request.Close {
2657 close(continueCh)
2658 } else {
2659 continueCh <- struct{}{}
2660 }
2661 }
2662
2663 resp.TLS = pc.tlsState
2664 return
2665 }
2666
2667
2668
2669
2670 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2671 if continueCh == nil {
2672 return nil
2673 }
2674 return func() bool {
2675 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2676 defer timer.Stop()
2677
2678 select {
2679 case _, ok := <-continueCh:
2680 return ok
2681 case <-timer.C:
2682 return true
2683 case <-pc.closech:
2684 return false
2685 }
2686 }
2687 }
2688
2689 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2690 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2691 if br.Buffered() != 0 {
2692 body.br = br
2693 }
2694 return body
2695 }
2696
2697
2698
2699
2700
2701
2702 type readWriteCloserBody struct {
2703 _ incomparable
2704 br *bufio.Reader
2705 io.ReadWriteCloser
2706 }
2707
2708 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2709 if b.br != nil {
2710 if n := b.br.Buffered(); len(p) > n {
2711 p = p[:n]
2712 }
2713 n, err = b.br.Read(p)
2714 if b.br.Buffered() == 0 {
2715 b.br = nil
2716 }
2717 return n, err
2718 }
2719 return b.ReadWriteCloser.Read(p)
2720 }
2721
2722 func (b *readWriteCloserBody) CloseWrite() error {
2723 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2724 return cw.CloseWrite()
2725 }
2726 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2727 }
2728
2729
2730 type nothingWrittenError struct {
2731 error
2732 }
2733
2734 func (nwe nothingWrittenError) Unwrap() error {
2735 return nwe.error
2736 }
2737
2738 func (pc *persistConn) writeLoop() {
2739 defer close(pc.writeLoopDone)
2740 for {
2741 select {
2742 case wr := <-pc.writech:
2743 startBytesWritten := pc.nwrite
2744 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2745 if bre, ok := err.(requestBodyReadError); ok {
2746 err = bre.error
2747
2748
2749
2750
2751
2752
2753
2754 wr.req.setError(err)
2755 }
2756 if err == nil {
2757 err = pc.bw.Flush()
2758 }
2759 if err != nil {
2760 if pc.nwrite == startBytesWritten {
2761 err = nothingWrittenError{err}
2762 }
2763 }
2764 pc.writeErrCh <- err
2765 wr.ch <- err
2766 if err != nil {
2767 pc.close(err)
2768 return
2769 }
2770 case <-pc.closech:
2771 return
2772 }
2773 }
2774 }
2775
2776
2777
2778
2779
2780
2781
2782 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2783
2784
2785
2786 func (pc *persistConn) wroteRequest() bool {
2787 select {
2788 case err := <-pc.writeErrCh:
2789
2790
2791 return err == nil
2792 default:
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2804 defer t.Stop()
2805 select {
2806 case err := <-pc.writeErrCh:
2807 return err == nil
2808 case <-t.C:
2809 return false
2810 }
2811 }
2812 }
2813
2814
2815
2816 type responseAndError struct {
2817 _ incomparable
2818 res *Response
2819 err error
2820 }
2821
2822 type requestAndChan struct {
2823 _ incomparable
2824 treq *transportRequest
2825 ch chan responseAndError
2826
2827
2828
2829
2830 addedGzip bool
2831
2832
2833
2834
2835
2836 continueCh chan<- struct{}
2837
2838 callerGone <-chan struct{}
2839 }
2840
2841
2842
2843
2844
2845 type writeRequest struct {
2846 req *transportRequest
2847 ch chan<- error
2848
2849
2850
2851
2852 continueCh <-chan struct{}
2853 }
2854
2855
2856
2857 type timeoutError struct {
2858 err string
2859 }
2860
2861 func (e *timeoutError) Error() string { return e.err }
2862 func (e *timeoutError) Timeout() bool { return true }
2863 func (e *timeoutError) Temporary() bool { return true }
2864 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2865
2866 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2867
2868
2869
2870 var errRequestCanceled = internal.ErrRequestCanceled
2871 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2872
2873
2874
2875 var errRequestDone = errors.New("net/http: request completed")
2876
2877 func nop() {}
2878
2879
2880 var (
2881 testHookEnterRoundTrip = nop
2882 testHookWaitResLoop = nop
2883 testHookRoundTripRetried = nop
2884 testHookPrePendingDial = nop
2885 testHookPostPendingDial = nop
2886
2887 testHookMu sync.Locker = fakeLocker{}
2888 testHookReadLoopBeforeNextRead = nop
2889 )
2890
2891 func (pc *persistConn) waitForAvailability(ctx context.Context) error {
2892 select {
2893 case <-pc.availch:
2894 return nil
2895 case <-pc.closech:
2896 return pc.closed
2897 case <-ctx.Done():
2898 return ctx.Err()
2899 }
2900 }
2901
2902 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2903 testHookEnterRoundTrip()
2904
2905 pc.mu.Lock()
2906 if pc.isClientConn {
2907 if !pc.reserved {
2908 pc.mu.Unlock()
2909 if err := pc.waitForAvailability(req.ctx); err != nil {
2910 return nil, err
2911 }
2912 pc.mu.Lock()
2913 }
2914 pc.reserved = false
2915 pc.inFlight = true
2916 }
2917 pc.numExpectedResponses++
2918 headerFn := pc.mutateHeaderFunc
2919 pc.mu.Unlock()
2920
2921 if headerFn != nil {
2922 headerFn(req.extraHeaders())
2923 }
2924
2925
2926
2927
2928
2929 requestedGzip := false
2930 if !pc.t.DisableCompression &&
2931 req.Header.Get("Accept-Encoding") == "" &&
2932 req.Header.Get("Range") == "" &&
2933 req.Method != "HEAD" {
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946 requestedGzip = true
2947 req.extraHeaders().Set("Accept-Encoding", "gzip")
2948 }
2949
2950 var continueCh chan struct{}
2951 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2952 continueCh = make(chan struct{}, 1)
2953 }
2954
2955 if pc.t.DisableKeepAlives &&
2956 !req.wantsClose() &&
2957 !isProtocolSwitchHeader(req.Header) {
2958 req.extraHeaders().Set("Connection", "close")
2959 }
2960
2961 gone := make(chan struct{})
2962 defer close(gone)
2963
2964 const debugRoundTrip = false
2965
2966
2967
2968
2969 startBytesWritten := pc.nwrite
2970 writeErrCh := make(chan error, 1)
2971 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2972
2973 resc := make(chan responseAndError)
2974 pc.reqch <- requestAndChan{
2975 treq: req,
2976 ch: resc,
2977 addedGzip: requestedGzip,
2978 continueCh: continueCh,
2979 callerGone: gone,
2980 }
2981
2982 handleResponse := func(re responseAndError) (*Response, error) {
2983 if (re.res == nil) == (re.err == nil) {
2984 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2985 }
2986 if debugRoundTrip {
2987 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2988 }
2989 if re.err != nil {
2990 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2991 }
2992 return re.res, nil
2993 }
2994
2995 var respHeaderTimer <-chan time.Time
2996 ctxDoneChan := req.ctx.Done()
2997 pcClosed := pc.closech
2998 for {
2999 testHookWaitResLoop()
3000 select {
3001 case err := <-writeErrCh:
3002 if debugRoundTrip {
3003 req.logf("writeErrCh recv: %T/%#v", err, err)
3004 }
3005 if err != nil {
3006 pc.close(fmt.Errorf("write error: %w", err))
3007 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
3008 }
3009 if d := pc.t.ResponseHeaderTimeout; d > 0 {
3010 if debugRoundTrip {
3011 req.logf("starting timer for %v", d)
3012 }
3013 timer := time.NewTimer(d)
3014 defer timer.Stop()
3015 respHeaderTimer = timer.C
3016 }
3017 case <-pcClosed:
3018 select {
3019 case re := <-resc:
3020
3021
3022
3023 return handleResponse(re)
3024 default:
3025 }
3026 if debugRoundTrip {
3027 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
3028 }
3029 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
3030 case <-respHeaderTimer:
3031 if debugRoundTrip {
3032 req.logf("timeout waiting for response headers.")
3033 }
3034 pc.close(errTimeout)
3035 return nil, errTimeout
3036 case re := <-resc:
3037 return handleResponse(re)
3038 case <-ctxDoneChan:
3039 select {
3040 case re := <-resc:
3041
3042
3043
3044 return handleResponse(re)
3045 default:
3046 }
3047 pc.cancelRequest(context.Cause(req.ctx))
3048 }
3049 }
3050 }
3051
3052
3053
3054 type tLogKey struct{}
3055
3056 func (tr *transportRequest) logf(format string, args ...any) {
3057 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
3058 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
3059 }
3060 }
3061
3062
3063
3064 func (pc *persistConn) markReused() {
3065 pc.mu.Lock()
3066 pc.reused = true
3067 pc.mu.Unlock()
3068 }
3069
3070
3071
3072
3073
3074
3075 func (pc *persistConn) close(err error) {
3076 pc.mu.Lock()
3077 defer pc.mu.Unlock()
3078 pc.closeLocked(err)
3079 }
3080
3081 func (pc *persistConn) closeLocked(err error) {
3082 if err == nil {
3083 panic("nil error")
3084 }
3085 if pc.closed == nil {
3086 pc.closed = err
3087 pc.t.decConnsPerHost(pc.cacheKey)
3088
3089
3090
3091 if pc.alt == nil {
3092 if err != errCallerOwnsConn {
3093 pc.conn.Close()
3094 }
3095 close(pc.closech)
3096 } else {
3097 if cc, ok := pc.alt.(io.Closer); ok {
3098 cc.Close()
3099 }
3100 }
3101 }
3102 pc.mutateHeaderFunc = nil
3103 }
3104
3105 func schemePort(scheme string) string {
3106 switch scheme {
3107 case "http":
3108 return "80"
3109 case "https":
3110 return "443"
3111 case "socks5", "socks5h":
3112 return "1080"
3113 default:
3114 return ""
3115 }
3116 }
3117
3118 func idnaASCIIFromURL(url *url.URL) string {
3119 addr := url.Hostname()
3120 if v, err := idnaASCII(addr); err == nil {
3121 addr = v
3122 }
3123 return addr
3124 }
3125
3126
3127 func canonicalAddr(url *url.URL) string {
3128 port := url.Port()
3129 if port == "" {
3130 port = schemePort(url.Scheme)
3131 }
3132 return net.JoinHostPort(idnaASCIIFromURL(url), port)
3133 }
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146 type bodyEOFSignal struct {
3147 body io.ReadCloser
3148 mu sync.Mutex
3149 closed bool
3150 rerr error
3151 fn func(error) error
3152 earlyCloseFn func() error
3153 }
3154
3155 var errReadOnClosedResBody = errors.New("http: read on closed response body")
3156 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
3157
3158 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
3159 es.mu.Lock()
3160 closed, rerr := es.closed, es.rerr
3161 es.mu.Unlock()
3162 if closed {
3163 return 0, errReadOnClosedResBody
3164 }
3165 if rerr != nil {
3166 return 0, rerr
3167 }
3168
3169 n, err = es.body.Read(p)
3170 if err != nil {
3171 es.mu.Lock()
3172 defer es.mu.Unlock()
3173 if es.rerr == nil {
3174 es.rerr = err
3175 }
3176 err = es.condfn(err)
3177 }
3178 return
3179 }
3180
3181 func (es *bodyEOFSignal) Close() error {
3182 es.mu.Lock()
3183 defer es.mu.Unlock()
3184 if es.closed {
3185 return nil
3186 }
3187 es.closed = true
3188 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3189 return es.earlyCloseFn()
3190 }
3191 err := es.body.Close()
3192 return es.condfn(err)
3193 }
3194
3195
3196 func (es *bodyEOFSignal) condfn(err error) error {
3197 if es.fn == nil {
3198 return err
3199 }
3200 err = es.fn(err)
3201 es.fn = nil
3202 return err
3203 }
3204
3205
3206
3207
3208
3209 type gzipReader struct {
3210 _ incomparable
3211 body *bodyEOFSignal
3212 mu sync.Mutex
3213 zr *gzip.Reader
3214 zerr error
3215 }
3216
3217 type eofReader struct{}
3218
3219 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3220 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3221
3222 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3223
3224
3225 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3226 zr := gzipPool.Get().(*gzip.Reader)
3227 if err := zr.Reset(r); err != nil {
3228 gzipPoolPut(zr)
3229 return nil, err
3230 }
3231 return zr, nil
3232 }
3233
3234
3235 func gzipPoolPut(zr *gzip.Reader) {
3236
3237
3238 var r flate.Reader = eofReader{}
3239 zr.Reset(r)
3240 gzipPool.Put(zr)
3241 }
3242
3243
3244
3245 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3246 gz.mu.Lock()
3247 defer gz.mu.Unlock()
3248 if gz.zerr != nil {
3249 return nil, gz.zerr
3250 }
3251 if gz.zr == nil {
3252 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3253 if gz.zerr != nil {
3254 return nil, gz.zerr
3255 }
3256 }
3257 ret := gz.zr
3258 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3259 return ret, nil
3260 }
3261
3262
3263 func (gz *gzipReader) release(zr *gzip.Reader) {
3264 gz.mu.Lock()
3265 defer gz.mu.Unlock()
3266 if gz.zerr == errConcurrentReadOnResBody {
3267 gz.zr, gz.zerr = zr, nil
3268 } else {
3269 gzipPoolPut(zr)
3270 }
3271 }
3272
3273
3274
3275 func (gz *gzipReader) close() {
3276 gz.mu.Lock()
3277 defer gz.mu.Unlock()
3278 if gz.zerr == nil && gz.zr != nil {
3279 gzipPoolPut(gz.zr)
3280 gz.zr = nil
3281 }
3282 gz.zerr = errReadOnClosedResBody
3283 }
3284
3285 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3286 zr, err := gz.acquire()
3287 if err != nil {
3288 return 0, err
3289 }
3290 defer gz.release(zr)
3291
3292 return zr.Read(p)
3293 }
3294
3295 func (gz *gzipReader) Close() error {
3296 gz.close()
3297
3298 return gz.body.Close()
3299 }
3300
3301 type tlsHandshakeTimeoutError struct{}
3302
3303 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3304 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3305 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3306
3307
3308
3309
3310 type fakeLocker struct{}
3311
3312 func (fakeLocker) Lock() {}
3313 func (fakeLocker) Unlock() {}
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3329 if cfg == nil {
3330 return &tls.Config{}
3331 }
3332 return cfg.Clone()
3333 }
3334
3335 type connLRU struct {
3336 ll *list.List
3337 m map[*persistConn]*list.Element
3338 }
3339
3340
3341 func (cl *connLRU) add(pc *persistConn) {
3342 if cl.ll == nil {
3343 cl.ll = list.New()
3344 cl.m = make(map[*persistConn]*list.Element)
3345 }
3346 ele := cl.ll.PushFront(pc)
3347 if _, ok := cl.m[pc]; ok {
3348 panic("persistConn was already in LRU")
3349 }
3350 cl.m[pc] = ele
3351 }
3352
3353 func (cl *connLRU) removeOldest() *persistConn {
3354 ele := cl.ll.Back()
3355 pc := ele.Value.(*persistConn)
3356 cl.ll.Remove(ele)
3357 delete(cl.m, pc)
3358 return pc
3359 }
3360
3361
3362 func (cl *connLRU) remove(pc *persistConn) {
3363 if ele, ok := cl.m[pc]; ok {
3364 cl.ll.Remove(ele)
3365 delete(cl.m, pc)
3366 }
3367 }
3368
3369
3370 func (cl *connLRU) len() int {
3371 return len(cl.m)
3372 }
3373
View as plain text