Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal/ascii"
28 "net/textproto"
29 "net/url"
30 "reflect"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35 _ "unsafe"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http/httpproxy"
39 )
40
41
42
43
44
45
46 var DefaultTransport RoundTripper = &Transport{
47 Proxy: ProxyFromEnvironment,
48 DialContext: defaultTransportDialContext(&net.Dialer{
49 Timeout: 30 * time.Second,
50 KeepAlive: 30 * time.Second,
51 }),
52 ForceAttemptHTTP2: true,
53 MaxIdleConns: 100,
54 IdleConnTimeout: 90 * time.Second,
55 TLSHandshakeTimeout: 10 * time.Second,
56 ExpectContinueTimeout: 1 * time.Second,
57 }
58
59
60
61 const DefaultMaxIdleConnsPerHost = 2
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 type Transport struct {
98 idleMu sync.Mutex
99 closeIdle bool
100 idleConn map[connectMethodKey][]*persistConn
101 idleConnWait map[connectMethodKey]wantConnQueue
102 idleLRU connLRU
103
104 reqMu sync.Mutex
105 reqCanceler map[*Request]context.CancelCauseFunc
106
107 altMu sync.Mutex
108 altProto atomic.Value
109
110 connsPerHostMu sync.Mutex
111 connsPerHost map[connectMethodKey]int
112 connsPerHostWait map[connectMethodKey]wantConnQueue
113 dialsInProgress wantConnQueue
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 Proxy func(*Request) (*url.URL, error)
130
131
132
133
134 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
135
136
137
138
139
140
141
142
143
144 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
145
146
147
148
149
150
151
152
153
154
155
156 Dial func(network, addr string) (net.Conn, error)
157
158
159
160
161
162
163
164
165
166
167
168 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
169
170
171
172
173
174
175
176 DialTLS func(network, addr string) (net.Conn, error)
177
178
179
180
181
182 TLSClientConfig *tls.Config
183
184
185
186 TLSHandshakeTimeout time.Duration
187
188
189
190
191
192
193 DisableKeepAlives bool
194
195
196
197
198
199
200
201
202
203 DisableCompression bool
204
205
206
207 MaxIdleConns int
208
209
210
211
212 MaxIdleConnsPerHost int
213
214
215
216
217
218
219 MaxConnsPerHost int
220
221
222
223
224
225 IdleConnTimeout time.Duration
226
227
228
229
230
231 ResponseHeaderTimeout time.Duration
232
233
234
235
236
237
238
239
240 ExpectContinueTimeout time.Duration
241
242
243
244
245
246
247
248
249
250
251
252 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
253
254
255
256
257 ProxyConnectHeader Header
258
259
260
261
262
263
264
265
266 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
267
268
269
270
271
272
273 MaxResponseHeaderBytes int64
274
275
276
277
278 WriteBufferSize int
279
280
281
282
283 ReadBufferSize int
284
285
286
287 nextProtoOnce sync.Once
288 h2transport h2Transport
289 tlsNextProtoWasNil bool
290
291
292
293
294
295
296 ForceAttemptHTTP2 bool
297
298
299
300
301
302 HTTP2 *HTTP2Config
303
304
305
306
307
308
309
310
311
312 Protocols *Protocols
313 }
314
315 func (t *Transport) writeBufferSize() int {
316 if t.WriteBufferSize > 0 {
317 return t.WriteBufferSize
318 }
319 return 4 << 10
320 }
321
322 func (t *Transport) readBufferSize() int {
323 if t.ReadBufferSize > 0 {
324 return t.ReadBufferSize
325 }
326 return 4 << 10
327 }
328
329 func (t *Transport) maxHeaderResponseSize() int64 {
330 if t.MaxResponseHeaderBytes > 0 {
331 return t.MaxResponseHeaderBytes
332 }
333 return 10 << 20
334 }
335
336
337 func (t *Transport) Clone() *Transport {
338 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
339 t2 := &Transport{
340 Proxy: t.Proxy,
341 OnProxyConnectResponse: t.OnProxyConnectResponse,
342 DialContext: t.DialContext,
343 Dial: t.Dial,
344 DialTLS: t.DialTLS,
345 DialTLSContext: t.DialTLSContext,
346 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
347 DisableKeepAlives: t.DisableKeepAlives,
348 DisableCompression: t.DisableCompression,
349 MaxIdleConns: t.MaxIdleConns,
350 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
351 MaxConnsPerHost: t.MaxConnsPerHost,
352 IdleConnTimeout: t.IdleConnTimeout,
353 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
354 ExpectContinueTimeout: t.ExpectContinueTimeout,
355 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
356 GetProxyConnectHeader: t.GetProxyConnectHeader,
357 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
358 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
359 WriteBufferSize: t.WriteBufferSize,
360 ReadBufferSize: t.ReadBufferSize,
361 }
362 if t.TLSClientConfig != nil {
363 t2.TLSClientConfig = t.TLSClientConfig.Clone()
364 }
365 if t.HTTP2 != nil {
366 t2.HTTP2 = &HTTP2Config{}
367 *t2.HTTP2 = *t.HTTP2
368 }
369 if t.Protocols != nil {
370 t2.Protocols = &Protocols{}
371 *t2.Protocols = *t.Protocols
372 }
373 if !t.tlsNextProtoWasNil {
374 npm := maps.Clone(t.TLSNextProto)
375 if npm == nil {
376 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
377 }
378 t2.TLSNextProto = npm
379 }
380 return t2
381 }
382
383
384
385
386
387
388
389 type h2Transport interface {
390 CloseIdleConnections()
391 }
392
393 func (t *Transport) hasCustomTLSDialer() bool {
394 return t.DialTLS != nil || t.DialTLSContext != nil
395 }
396
397 var http2client = godebug.New("http2client")
398
399
400
401 func (t *Transport) onceSetNextProtoDefaults() {
402 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
403 if http2client.Value() == "0" {
404 http2client.IncNonDefault()
405 return
406 }
407
408
409
410
411
412
413 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
414 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
415 if v := rv.Field(0); v.CanInterface() {
416 if h2i, ok := v.Interface().(h2Transport); ok {
417 t.h2transport = h2i
418 return
419 }
420 }
421 }
422
423 if _, ok := t.TLSNextProto["h2"]; ok {
424
425 return
426 }
427 protocols := t.protocols()
428 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
429 return
430 }
431 if omitBundledHTTP2 {
432 return
433 }
434 t2, err := http2configureTransports(t)
435 if err != nil {
436 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
437 return
438 }
439 t.h2transport = t2
440
441
442
443
444
445
446
447 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
448 const h2max = 1<<32 - 1
449 if limit1 >= h2max {
450 t2.MaxHeaderListSize = h2max
451 } else {
452 t2.MaxHeaderListSize = uint32(limit1)
453 }
454 }
455
456
457
458
459
460
461 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
462 }
463
464 func (t *Transport) protocols() Protocols {
465 if t.Protocols != nil {
466 return *t.Protocols
467 }
468 var p Protocols
469 p.SetHTTP1(true)
470 switch {
471 case t.TLSNextProto != nil:
472
473
474 if t.TLSNextProto["h2"] != nil {
475 p.SetHTTP2(true)
476 }
477 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
478
479
480
481
482
483
484 case http2client.Value() == "0":
485 default:
486 p.SetHTTP2(true)
487 }
488 return p
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
508 return envProxyFunc()(req.URL)
509 }
510
511
512
513 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
514 return func(*Request) (*url.URL, error) {
515 return fixedURL, nil
516 }
517 }
518
519
520
521
522 type transportRequest struct {
523 *Request
524 extra Header
525 trace *httptrace.ClientTrace
526
527 ctx context.Context
528 cancel context.CancelCauseFunc
529
530 mu sync.Mutex
531 err error
532 }
533
534 func (tr *transportRequest) extraHeaders() Header {
535 if tr.extra == nil {
536 tr.extra = make(Header)
537 }
538 return tr.extra
539 }
540
541 func (tr *transportRequest) setError(err error) {
542 tr.mu.Lock()
543 if tr.err == nil {
544 tr.err = err
545 }
546 tr.mu.Unlock()
547 }
548
549
550
551 func (t *Transport) useRegisteredProtocol(req *Request) bool {
552 if req.URL.Scheme == "https" && req.requiresHTTP1() {
553
554
555
556
557 return false
558 }
559 return true
560 }
561
562
563
564
565 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
566 if !t.useRegisteredProtocol(req) {
567 return nil
568 }
569 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
570 return altProto[req.URL.Scheme]
571 }
572
573 func validateHeaders(hdrs Header) string {
574 for k, vv := range hdrs {
575 if !httpguts.ValidHeaderFieldName(k) {
576 return fmt.Sprintf("field name %q", k)
577 }
578 for _, v := range vv {
579 if !httpguts.ValidHeaderFieldValue(v) {
580
581
582 return fmt.Sprintf("field value for %q", k)
583 }
584 }
585 }
586 return ""
587 }
588
589
590 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
591 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
592 ctx := req.Context()
593 trace := httptrace.ContextClientTrace(ctx)
594
595 if req.URL == nil {
596 req.closeBody()
597 return nil, errors.New("http: nil Request.URL")
598 }
599 if req.Header == nil {
600 req.closeBody()
601 return nil, errors.New("http: nil Request.Header")
602 }
603 scheme := req.URL.Scheme
604 isHTTP := scheme == "http" || scheme == "https"
605 if isHTTP {
606
607 if err := validateHeaders(req.Header); err != "" {
608 req.closeBody()
609 return nil, fmt.Errorf("net/http: invalid header %s", err)
610 }
611
612
613 if err := validateHeaders(req.Trailer); err != "" {
614 req.closeBody()
615 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
616 }
617 }
618
619 origReq := req
620 req = setupRewindBody(req)
621
622 if altRT := t.alternateRoundTripper(req); altRT != nil {
623 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
624 return resp, err
625 }
626 var err error
627 req, err = rewindBody(req)
628 if err != nil {
629 return nil, err
630 }
631 }
632 if !isHTTP {
633 req.closeBody()
634 return nil, badStringError("unsupported protocol scheme", scheme)
635 }
636 if req.Method != "" && !validMethod(req.Method) {
637 req.closeBody()
638 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
639 }
640 if req.URL.Host == "" {
641 req.closeBody()
642 return nil, errors.New("http: no Host in request URL")
643 }
644
645
646
647
648
649
650
651
652
653
654 ctx, cancel := context.WithCancelCause(req.Context())
655
656
657 if origReq.Cancel != nil {
658 go awaitLegacyCancel(ctx, cancel, origReq)
659 }
660
661
662
663
664
665 cancel = t.prepareTransportCancel(origReq, cancel)
666
667 defer func() {
668 if err != nil {
669 cancel(err)
670 }
671 }()
672
673 for {
674 select {
675 case <-ctx.Done():
676 req.closeBody()
677 return nil, context.Cause(ctx)
678 default:
679 }
680
681
682 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
683 cm, err := t.connectMethodForRequest(treq)
684 if err != nil {
685 req.closeBody()
686 return nil, err
687 }
688
689
690
691
692
693 pconn, err := t.getConn(treq, cm)
694 if err != nil {
695 req.closeBody()
696 return nil, err
697 }
698
699 var resp *Response
700 if pconn.alt != nil {
701
702 resp, err = pconn.alt.RoundTrip(req)
703 } else {
704 resp, err = pconn.roundTrip(treq)
705 }
706 if err == nil {
707 if pconn.alt != nil {
708
709
710
711
712
713 cancel(errRequestDone)
714 }
715 resp.Request = origReq
716 return resp, nil
717 }
718
719
720 if http2isNoCachedConnError(err) {
721 if t.removeIdleConn(pconn) {
722 t.decConnsPerHost(pconn.cacheKey)
723 }
724 } else if !pconn.shouldRetryRequest(req, err) {
725
726
727 if e, ok := err.(nothingWrittenError); ok {
728 err = e.error
729 }
730 if e, ok := err.(transportReadFromServerError); ok {
731 err = e.err
732 }
733 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
734
735
736
737 req.closeBody()
738 }
739 return nil, err
740 }
741 testHookRoundTripRetried()
742
743
744 req, err = rewindBody(req)
745 if err != nil {
746 return nil, err
747 }
748 }
749 }
750
751 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
752 select {
753 case <-req.Cancel:
754 cancel(errRequestCanceled)
755 case <-ctx.Done():
756 }
757 }
758
759 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
760
761 type readTrackingBody struct {
762 io.ReadCloser
763 didRead bool
764 didClose atomic.Bool
765 }
766
767 func (r *readTrackingBody) Read(data []byte) (int, error) {
768 r.didRead = true
769 return r.ReadCloser.Read(data)
770 }
771
772 func (r *readTrackingBody) Close() error {
773 if !r.didClose.CompareAndSwap(false, true) {
774 return nil
775 }
776 return r.ReadCloser.Close()
777 }
778
779
780
781
782
783 func setupRewindBody(req *Request) *Request {
784 if req.Body == nil || req.Body == NoBody {
785 return req
786 }
787 newReq := *req
788 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
789 return &newReq
790 }
791
792
793
794
795
796 func rewindBody(req *Request) (rewound *Request, err error) {
797 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
798 return req, nil
799 }
800 if !req.Body.(*readTrackingBody).didClose.Load() {
801 req.closeBody()
802 }
803 if req.GetBody == nil {
804 return nil, errCannotRewind
805 }
806 body, err := req.GetBody()
807 if err != nil {
808 return nil, err
809 }
810 newReq := *req
811 newReq.Body = &readTrackingBody{ReadCloser: body}
812 return &newReq, nil
813 }
814
815
816
817
818 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
819 if http2isNoCachedConnError(err) {
820
821
822
823
824
825
826 return true
827 }
828 if err == errMissingHost {
829
830 return false
831 }
832 if !pc.isReused() {
833
834
835
836
837
838
839
840 return false
841 }
842 if _, ok := err.(nothingWrittenError); ok {
843
844
845 return req.outgoingLength() == 0 || req.GetBody != nil
846 }
847 if !req.isReplayable() {
848
849 return false
850 }
851 if _, ok := err.(transportReadFromServerError); ok {
852
853
854 return true
855 }
856 if err == errServerClosedIdle {
857
858
859
860 return true
861 }
862 return false
863 }
864
865
866 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
867
868
869
870
871
872
873
874
875
876
877
878 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
879 t.altMu.Lock()
880 defer t.altMu.Unlock()
881 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
882 if _, exists := oldMap[scheme]; exists {
883 panic("protocol " + scheme + " already registered")
884 }
885 newMap := maps.Clone(oldMap)
886 if newMap == nil {
887 newMap = make(map[string]RoundTripper)
888 }
889 newMap[scheme] = rt
890 t.altProto.Store(newMap)
891 }
892
893
894
895
896
897 func (t *Transport) CloseIdleConnections() {
898 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
899 t.idleMu.Lock()
900 m := t.idleConn
901 t.idleConn = nil
902 t.closeIdle = true
903 t.idleLRU = connLRU{}
904 t.idleMu.Unlock()
905 for _, conns := range m {
906 for _, pconn := range conns {
907 pconn.close(errCloseIdleConns)
908 }
909 }
910 t.connsPerHostMu.Lock()
911 t.dialsInProgress.all(func(w *wantConn) {
912 if w.cancelCtx != nil && !w.waiting() {
913 w.cancelCtx()
914 }
915 })
916 t.connsPerHostMu.Unlock()
917 if t2 := t.h2transport; t2 != nil {
918 t2.CloseIdleConnections()
919 }
920 }
921
922
923 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
924
925
926
927
928
929
930 cancel := func(err error) {
931 origCancel(err)
932 t.reqMu.Lock()
933 delete(t.reqCanceler, req)
934 t.reqMu.Unlock()
935 }
936 t.reqMu.Lock()
937 if t.reqCanceler == nil {
938 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
939 }
940 t.reqCanceler[req] = cancel
941 t.reqMu.Unlock()
942 return cancel
943 }
944
945
946
947
948
949
950
951 func (t *Transport) CancelRequest(req *Request) {
952 t.reqMu.Lock()
953 cancel := t.reqCanceler[req]
954 t.reqMu.Unlock()
955 if cancel != nil {
956 cancel(errRequestCanceled)
957 }
958 }
959
960
961
962
963
964 var (
965 envProxyOnce sync.Once
966 envProxyFuncValue func(*url.URL) (*url.URL, error)
967 )
968
969
970
971 func envProxyFunc() func(*url.URL) (*url.URL, error) {
972 envProxyOnce.Do(func() {
973 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
974 })
975 return envProxyFuncValue
976 }
977
978
979 func resetProxyConfig() {
980 envProxyOnce = sync.Once{}
981 envProxyFuncValue = nil
982 }
983
984 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
985 cm.targetScheme = treq.URL.Scheme
986 cm.targetAddr = canonicalAddr(treq.URL)
987 if t.Proxy != nil {
988 cm.proxyURL, err = t.Proxy(treq.Request)
989 }
990 cm.onlyH1 = treq.requiresHTTP1()
991 return cm, err
992 }
993
994
995
996 func (cm *connectMethod) proxyAuth() string {
997 if cm.proxyURL == nil {
998 return ""
999 }
1000 if u := cm.proxyURL.User; u != nil {
1001 username := u.Username()
1002 password, _ := u.Password()
1003 return "Basic " + basicAuth(username, password)
1004 }
1005 return ""
1006 }
1007
1008
1009 var (
1010 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1011 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1012 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1013 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1014 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1015 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1016 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1017 errIdleConnTimeout = errors.New("http: idle connection timeout")
1018
1019
1020
1021
1022
1023 errServerClosedIdle = errors.New("http: server closed idle connection")
1024 )
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034 type transportReadFromServerError struct {
1035 err error
1036 }
1037
1038 func (e transportReadFromServerError) Unwrap() error { return e.err }
1039
1040 func (e transportReadFromServerError) Error() string {
1041 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1042 }
1043
1044 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1045 if err := t.tryPutIdleConn(pconn); err != nil {
1046 pconn.close(err)
1047 }
1048 }
1049
1050 func (t *Transport) maxIdleConnsPerHost() int {
1051 if v := t.MaxIdleConnsPerHost; v != 0 {
1052 return v
1053 }
1054 return DefaultMaxIdleConnsPerHost
1055 }
1056
1057
1058
1059
1060
1061
1062 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1063 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1064 return errKeepAlivesDisabled
1065 }
1066 if pconn.isBroken() {
1067 return errConnBroken
1068 }
1069 pconn.markReused()
1070
1071 t.idleMu.Lock()
1072 defer t.idleMu.Unlock()
1073
1074
1075
1076
1077 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1078 return nil
1079 }
1080
1081
1082
1083
1084
1085 key := pconn.cacheKey
1086 if q, ok := t.idleConnWait[key]; ok {
1087 done := false
1088 if pconn.alt == nil {
1089
1090
1091 for q.len() > 0 {
1092 w := q.popFront()
1093 if w.tryDeliver(pconn, nil, time.Time{}) {
1094 done = true
1095 break
1096 }
1097 }
1098 } else {
1099
1100
1101
1102
1103 for q.len() > 0 {
1104 w := q.popFront()
1105 w.tryDeliver(pconn, nil, time.Time{})
1106 }
1107 }
1108 if q.len() == 0 {
1109 delete(t.idleConnWait, key)
1110 } else {
1111 t.idleConnWait[key] = q
1112 }
1113 if done {
1114 return nil
1115 }
1116 }
1117
1118 if t.closeIdle {
1119 return errCloseIdle
1120 }
1121 if t.idleConn == nil {
1122 t.idleConn = make(map[connectMethodKey][]*persistConn)
1123 }
1124 idles := t.idleConn[key]
1125 if len(idles) >= t.maxIdleConnsPerHost() {
1126 return errTooManyIdleHost
1127 }
1128 for _, exist := range idles {
1129 if exist == pconn {
1130 log.Fatalf("dup idle pconn %p in freelist", pconn)
1131 }
1132 }
1133 t.idleConn[key] = append(idles, pconn)
1134 t.idleLRU.add(pconn)
1135 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1136 oldest := t.idleLRU.removeOldest()
1137 oldest.close(errTooManyIdle)
1138 t.removeIdleConnLocked(oldest)
1139 }
1140
1141
1142
1143
1144 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1145 if pconn.idleTimer != nil {
1146 pconn.idleTimer.Reset(t.IdleConnTimeout)
1147 } else {
1148 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1149 }
1150 }
1151 pconn.idleAt = time.Now()
1152 return nil
1153 }
1154
1155
1156
1157
1158 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1159 if t.DisableKeepAlives {
1160 return false
1161 }
1162
1163 t.idleMu.Lock()
1164 defer t.idleMu.Unlock()
1165
1166
1167
1168 t.closeIdle = false
1169
1170 if w == nil {
1171
1172 return false
1173 }
1174
1175
1176
1177
1178 var oldTime time.Time
1179 if t.IdleConnTimeout > 0 {
1180 oldTime = time.Now().Add(-t.IdleConnTimeout)
1181 }
1182
1183
1184 if list, ok := t.idleConn[w.key]; ok {
1185 stop := false
1186 delivered := false
1187 for len(list) > 0 && !stop {
1188 pconn := list[len(list)-1]
1189
1190
1191
1192
1193 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1194 if tooOld {
1195
1196
1197
1198 go pconn.closeConnIfStillIdle()
1199 }
1200 if pconn.isBroken() || tooOld {
1201
1202
1203
1204
1205
1206 list = list[:len(list)-1]
1207 continue
1208 }
1209 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1210 if delivered {
1211 if pconn.alt != nil {
1212
1213
1214 } else {
1215
1216
1217 t.idleLRU.remove(pconn)
1218 list = list[:len(list)-1]
1219 }
1220 }
1221 stop = true
1222 }
1223 if len(list) > 0 {
1224 t.idleConn[w.key] = list
1225 } else {
1226 delete(t.idleConn, w.key)
1227 }
1228 if stop {
1229 return delivered
1230 }
1231 }
1232
1233
1234 if t.idleConnWait == nil {
1235 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1236 }
1237 q := t.idleConnWait[w.key]
1238 q.cleanFrontNotWaiting()
1239 q.pushBack(w)
1240 t.idleConnWait[w.key] = q
1241 return false
1242 }
1243
1244
1245 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1246 t.idleMu.Lock()
1247 defer t.idleMu.Unlock()
1248 return t.removeIdleConnLocked(pconn)
1249 }
1250
1251
1252 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1253 if pconn.idleTimer != nil {
1254 pconn.idleTimer.Stop()
1255 }
1256 t.idleLRU.remove(pconn)
1257 key := pconn.cacheKey
1258 pconns := t.idleConn[key]
1259 var removed bool
1260 switch len(pconns) {
1261 case 0:
1262
1263 case 1:
1264 if pconns[0] == pconn {
1265 delete(t.idleConn, key)
1266 removed = true
1267 }
1268 default:
1269 for i, v := range pconns {
1270 if v != pconn {
1271 continue
1272 }
1273
1274
1275 copy(pconns[i:], pconns[i+1:])
1276 t.idleConn[key] = pconns[:len(pconns)-1]
1277 removed = true
1278 break
1279 }
1280 }
1281 return removed
1282 }
1283
1284 var zeroDialer net.Dialer
1285
1286 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1287 if t.DialContext != nil {
1288 c, err := t.DialContext(ctx, network, addr)
1289 if c == nil && err == nil {
1290 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1291 }
1292 return c, err
1293 }
1294 if t.Dial != nil {
1295 c, err := t.Dial(network, addr)
1296 if c == nil && err == nil {
1297 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1298 }
1299 return c, err
1300 }
1301 return zeroDialer.DialContext(ctx, network, addr)
1302 }
1303
1304
1305
1306
1307
1308
1309
1310 type wantConn struct {
1311 cm connectMethod
1312 key connectMethodKey
1313
1314
1315
1316
1317 beforeDial func()
1318 afterDial func()
1319
1320 mu sync.Mutex
1321 ctx context.Context
1322 cancelCtx context.CancelFunc
1323 done bool
1324 result chan connOrError
1325 }
1326
1327 type connOrError struct {
1328 pc *persistConn
1329 err error
1330 idleAt time.Time
1331 }
1332
1333
1334 func (w *wantConn) waiting() bool {
1335 w.mu.Lock()
1336 defer w.mu.Unlock()
1337
1338 return !w.done
1339 }
1340
1341
1342 func (w *wantConn) getCtxForDial() context.Context {
1343 w.mu.Lock()
1344 defer w.mu.Unlock()
1345
1346 return w.ctx
1347 }
1348
1349
1350 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1351 w.mu.Lock()
1352 defer w.mu.Unlock()
1353
1354 if w.done {
1355 return false
1356 }
1357 if (pc == nil) == (err == nil) {
1358 panic("net/http: internal error: misuse of tryDeliver")
1359 }
1360 w.ctx = nil
1361 w.done = true
1362
1363 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1364 close(w.result)
1365
1366 return true
1367 }
1368
1369
1370
1371 func (w *wantConn) cancel(t *Transport) {
1372 w.mu.Lock()
1373 var pc *persistConn
1374 if w.done {
1375 if r, ok := <-w.result; ok {
1376 pc = r.pc
1377 }
1378 } else {
1379 close(w.result)
1380 }
1381 w.ctx = nil
1382 w.done = true
1383 w.mu.Unlock()
1384
1385
1386
1387
1388 if pc != nil && pc.alt == nil {
1389 t.putOrCloseIdleConn(pc)
1390 }
1391 }
1392
1393
1394 type wantConnQueue struct {
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405 head []*wantConn
1406 headPos int
1407 tail []*wantConn
1408 }
1409
1410
1411 func (q *wantConnQueue) len() int {
1412 return len(q.head) - q.headPos + len(q.tail)
1413 }
1414
1415
1416 func (q *wantConnQueue) pushBack(w *wantConn) {
1417 q.tail = append(q.tail, w)
1418 }
1419
1420
1421 func (q *wantConnQueue) popFront() *wantConn {
1422 if q.headPos >= len(q.head) {
1423 if len(q.tail) == 0 {
1424 return nil
1425 }
1426
1427 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1428 }
1429 w := q.head[q.headPos]
1430 q.head[q.headPos] = nil
1431 q.headPos++
1432 return w
1433 }
1434
1435
1436 func (q *wantConnQueue) peekFront() *wantConn {
1437 if q.headPos < len(q.head) {
1438 return q.head[q.headPos]
1439 }
1440 if len(q.tail) > 0 {
1441 return q.tail[0]
1442 }
1443 return nil
1444 }
1445
1446
1447
1448 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1449 for {
1450 w := q.peekFront()
1451 if w == nil || w.waiting() {
1452 return cleaned
1453 }
1454 q.popFront()
1455 cleaned = true
1456 }
1457 }
1458
1459
1460 func (q *wantConnQueue) cleanFrontCanceled() {
1461 for {
1462 w := q.peekFront()
1463 if w == nil || w.cancelCtx != nil {
1464 return
1465 }
1466 q.popFront()
1467 }
1468 }
1469
1470
1471
1472 func (q *wantConnQueue) all(f func(*wantConn)) {
1473 for _, w := range q.head[q.headPos:] {
1474 f(w)
1475 }
1476 for _, w := range q.tail {
1477 f(w)
1478 }
1479 }
1480
1481 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1482 if t.DialTLSContext != nil {
1483 conn, err = t.DialTLSContext(ctx, network, addr)
1484 } else {
1485 conn, err = t.DialTLS(network, addr)
1486 }
1487 if conn == nil && err == nil {
1488 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1489 }
1490 return
1491 }
1492
1493
1494
1495
1496
1497 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1498 req := treq.Request
1499 trace := treq.trace
1500 ctx := req.Context()
1501 if trace != nil && trace.GetConn != nil {
1502 trace.GetConn(cm.addr())
1503 }
1504
1505
1506
1507
1508
1509
1510 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1511
1512 w := &wantConn{
1513 cm: cm,
1514 key: cm.key(),
1515 ctx: dialCtx,
1516 cancelCtx: dialCancel,
1517 result: make(chan connOrError, 1),
1518 beforeDial: testHookPrePendingDial,
1519 afterDial: testHookPostPendingDial,
1520 }
1521 defer func() {
1522 if err != nil {
1523 w.cancel(t)
1524 }
1525 }()
1526
1527
1528 if delivered := t.queueForIdleConn(w); !delivered {
1529 t.queueForDial(w)
1530 }
1531
1532
1533 select {
1534 case r := <-w.result:
1535
1536
1537 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1538 info := httptrace.GotConnInfo{
1539 Conn: r.pc.conn,
1540 Reused: r.pc.isReused(),
1541 }
1542 if !r.idleAt.IsZero() {
1543 info.WasIdle = true
1544 info.IdleTime = time.Since(r.idleAt)
1545 }
1546 trace.GotConn(info)
1547 }
1548 if r.err != nil {
1549
1550
1551
1552 select {
1553 case <-treq.ctx.Done():
1554 err := context.Cause(treq.ctx)
1555 if err == errRequestCanceled {
1556 err = errRequestCanceledConn
1557 }
1558 return nil, err
1559 default:
1560
1561 }
1562 }
1563 return r.pc, r.err
1564 case <-treq.ctx.Done():
1565 err := context.Cause(treq.ctx)
1566 if err == errRequestCanceled {
1567 err = errRequestCanceledConn
1568 }
1569 return nil, err
1570 }
1571 }
1572
1573
1574
1575 func (t *Transport) queueForDial(w *wantConn) {
1576 w.beforeDial()
1577
1578 t.connsPerHostMu.Lock()
1579 defer t.connsPerHostMu.Unlock()
1580
1581 if t.MaxConnsPerHost <= 0 {
1582 t.startDialConnForLocked(w)
1583 return
1584 }
1585
1586 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1587 if t.connsPerHost == nil {
1588 t.connsPerHost = make(map[connectMethodKey]int)
1589 }
1590 t.connsPerHost[w.key] = n + 1
1591 t.startDialConnForLocked(w)
1592 return
1593 }
1594
1595 if t.connsPerHostWait == nil {
1596 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1597 }
1598 q := t.connsPerHostWait[w.key]
1599 q.cleanFrontNotWaiting()
1600 q.pushBack(w)
1601 t.connsPerHostWait[w.key] = q
1602 }
1603
1604
1605
1606 func (t *Transport) startDialConnForLocked(w *wantConn) {
1607 t.dialsInProgress.cleanFrontCanceled()
1608 t.dialsInProgress.pushBack(w)
1609 go func() {
1610 t.dialConnFor(w)
1611 t.connsPerHostMu.Lock()
1612 defer t.connsPerHostMu.Unlock()
1613 w.cancelCtx = nil
1614 }()
1615 }
1616
1617
1618
1619
1620 func (t *Transport) dialConnFor(w *wantConn) {
1621 defer w.afterDial()
1622 ctx := w.getCtxForDial()
1623 if ctx == nil {
1624 t.decConnsPerHost(w.key)
1625 return
1626 }
1627
1628 pc, err := t.dialConn(ctx, w.cm)
1629 delivered := w.tryDeliver(pc, err, time.Time{})
1630 if err == nil && (!delivered || pc.alt != nil) {
1631
1632
1633
1634 t.putOrCloseIdleConn(pc)
1635 }
1636 if err != nil {
1637 t.decConnsPerHost(w.key)
1638 }
1639 }
1640
1641
1642
1643 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1644 if t.MaxConnsPerHost <= 0 {
1645 return
1646 }
1647
1648 t.connsPerHostMu.Lock()
1649 defer t.connsPerHostMu.Unlock()
1650 n := t.connsPerHost[key]
1651 if n == 0 {
1652
1653
1654 panic("net/http: internal error: connCount underflow")
1655 }
1656
1657
1658
1659
1660
1661 if q := t.connsPerHostWait[key]; q.len() > 0 {
1662 done := false
1663 for q.len() > 0 {
1664 w := q.popFront()
1665 if w.waiting() {
1666 t.startDialConnForLocked(w)
1667 done = true
1668 break
1669 }
1670 }
1671 if q.len() == 0 {
1672 delete(t.connsPerHostWait, key)
1673 } else {
1674
1675
1676 t.connsPerHostWait[key] = q
1677 }
1678 if done {
1679 return
1680 }
1681 }
1682
1683
1684 if n--; n == 0 {
1685 delete(t.connsPerHost, key)
1686 } else {
1687 t.connsPerHost[key] = n
1688 }
1689 }
1690
1691
1692
1693
1694 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1695
1696 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1697 if cfg.ServerName == "" {
1698 cfg.ServerName = name
1699 }
1700 if pconn.cacheKey.onlyH1 {
1701 cfg.NextProtos = nil
1702 }
1703 plainConn := pconn.conn
1704 tlsConn := tls.Client(plainConn, cfg)
1705 errc := make(chan error, 2)
1706 var timer *time.Timer
1707 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1708 timer = time.AfterFunc(d, func() {
1709 errc <- tlsHandshakeTimeoutError{}
1710 })
1711 }
1712 go func() {
1713 if trace != nil && trace.TLSHandshakeStart != nil {
1714 trace.TLSHandshakeStart()
1715 }
1716 err := tlsConn.HandshakeContext(ctx)
1717 if timer != nil {
1718 timer.Stop()
1719 }
1720 errc <- err
1721 }()
1722 if err := <-errc; err != nil {
1723 plainConn.Close()
1724 if err == (tlsHandshakeTimeoutError{}) {
1725
1726
1727 <-errc
1728 }
1729 if trace != nil && trace.TLSHandshakeDone != nil {
1730 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1731 }
1732 return err
1733 }
1734 cs := tlsConn.ConnectionState()
1735 if trace != nil && trace.TLSHandshakeDone != nil {
1736 trace.TLSHandshakeDone(cs, nil)
1737 }
1738 pconn.tlsState = &cs
1739 pconn.conn = tlsConn
1740 return nil
1741 }
1742
1743 type erringRoundTripper interface {
1744 RoundTripErr() error
1745 }
1746
1747 var testHookProxyConnectTimeout = context.WithTimeout
1748
1749 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1750 pconn = &persistConn{
1751 t: t,
1752 cacheKey: cm.key(),
1753 reqch: make(chan requestAndChan, 1),
1754 writech: make(chan writeRequest, 1),
1755 closech: make(chan struct{}),
1756 writeErrCh: make(chan error, 1),
1757 writeLoopDone: make(chan struct{}),
1758 }
1759 trace := httptrace.ContextClientTrace(ctx)
1760 wrapErr := func(err error) error {
1761 if cm.proxyURL != nil {
1762
1763 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1764 }
1765 return err
1766 }
1767 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1768 var err error
1769 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1770 if err != nil {
1771 return nil, wrapErr(err)
1772 }
1773 if tc, ok := pconn.conn.(*tls.Conn); ok {
1774
1775
1776 if trace != nil && trace.TLSHandshakeStart != nil {
1777 trace.TLSHandshakeStart()
1778 }
1779 if err := tc.HandshakeContext(ctx); err != nil {
1780 go pconn.conn.Close()
1781 if trace != nil && trace.TLSHandshakeDone != nil {
1782 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1783 }
1784 return nil, err
1785 }
1786 cs := tc.ConnectionState()
1787 if trace != nil && trace.TLSHandshakeDone != nil {
1788 trace.TLSHandshakeDone(cs, nil)
1789 }
1790 pconn.tlsState = &cs
1791 }
1792 } else {
1793 conn, err := t.dial(ctx, "tcp", cm.addr())
1794 if err != nil {
1795 return nil, wrapErr(err)
1796 }
1797 pconn.conn = conn
1798 if cm.scheme() == "https" {
1799 var firstTLSHost string
1800 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1801 return nil, wrapErr(err)
1802 }
1803 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1804 return nil, wrapErr(err)
1805 }
1806 }
1807 }
1808
1809
1810 switch {
1811 case cm.proxyURL == nil:
1812
1813 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1814 conn := pconn.conn
1815 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1816 if u := cm.proxyURL.User; u != nil {
1817 auth := &socksUsernamePassword{
1818 Username: u.Username(),
1819 }
1820 auth.Password, _ = u.Password()
1821 d.AuthMethods = []socksAuthMethod{
1822 socksAuthMethodNotRequired,
1823 socksAuthMethodUsernamePassword,
1824 }
1825 d.Authenticate = auth.Authenticate
1826 }
1827 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1828 conn.Close()
1829 return nil, err
1830 }
1831 case cm.targetScheme == "http":
1832 pconn.isProxy = true
1833 if pa := cm.proxyAuth(); pa != "" {
1834 pconn.mutateHeaderFunc = func(h Header) {
1835 h.Set("Proxy-Authorization", pa)
1836 }
1837 }
1838 case cm.targetScheme == "https":
1839 conn := pconn.conn
1840 var hdr Header
1841 if t.GetProxyConnectHeader != nil {
1842 var err error
1843 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1844 if err != nil {
1845 conn.Close()
1846 return nil, err
1847 }
1848 } else {
1849 hdr = t.ProxyConnectHeader
1850 }
1851 if hdr == nil {
1852 hdr = make(Header)
1853 }
1854 if pa := cm.proxyAuth(); pa != "" {
1855 hdr = hdr.Clone()
1856 hdr.Set("Proxy-Authorization", pa)
1857 }
1858 connectReq := &Request{
1859 Method: "CONNECT",
1860 URL: &url.URL{Opaque: cm.targetAddr},
1861 Host: cm.targetAddr,
1862 Header: hdr,
1863 }
1864
1865
1866
1867
1868 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1869 defer cancel()
1870
1871 didReadResponse := make(chan struct{})
1872 var (
1873 resp *Response
1874 err error
1875 )
1876
1877 go func() {
1878 defer close(didReadResponse)
1879 err = connectReq.Write(conn)
1880 if err != nil {
1881 return
1882 }
1883
1884
1885 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1886 resp, err = ReadResponse(br, connectReq)
1887 }()
1888 select {
1889 case <-connectCtx.Done():
1890 conn.Close()
1891 <-didReadResponse
1892 return nil, connectCtx.Err()
1893 case <-didReadResponse:
1894
1895 }
1896 if err != nil {
1897 conn.Close()
1898 return nil, err
1899 }
1900
1901 if t.OnProxyConnectResponse != nil {
1902 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1903 if err != nil {
1904 conn.Close()
1905 return nil, err
1906 }
1907 }
1908
1909 if resp.StatusCode != 200 {
1910 _, text, ok := strings.Cut(resp.Status, " ")
1911 conn.Close()
1912 if !ok {
1913 return nil, errors.New("unknown status code")
1914 }
1915 return nil, errors.New(text)
1916 }
1917 }
1918
1919 if cm.proxyURL != nil && cm.targetScheme == "https" {
1920 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1921 return nil, err
1922 }
1923 }
1924
1925
1926 unencryptedHTTP2 := pconn.tlsState == nil &&
1927 t.Protocols != nil &&
1928 t.Protocols.UnencryptedHTTP2() &&
1929 !t.Protocols.HTTP1()
1930 if unencryptedHTTP2 {
1931 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1932 if !ok {
1933 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1934 }
1935 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1936 if e, ok := alt.(erringRoundTripper); ok {
1937
1938 return nil, e.RoundTripErr()
1939 }
1940 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1941 }
1942
1943 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1944 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1945 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1946 if e, ok := alt.(erringRoundTripper); ok {
1947
1948 return nil, e.RoundTripErr()
1949 }
1950 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1951 }
1952 }
1953
1954 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1955 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1956
1957 go pconn.readLoop()
1958 go pconn.writeLoop()
1959 return pconn, nil
1960 }
1961
1962
1963
1964
1965
1966
1967
1968 type persistConnWriter struct {
1969 pc *persistConn
1970 }
1971
1972 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1973 n, err = w.pc.conn.Write(p)
1974 w.pc.nwrite += int64(n)
1975 return
1976 }
1977
1978
1979
1980
1981 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1982 n, err = io.Copy(w.pc.conn, r)
1983 w.pc.nwrite += n
1984 return
1985 }
1986
1987 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005 type connectMethod struct {
2006 _ incomparable
2007 proxyURL *url.URL
2008 targetScheme string
2009
2010
2011
2012 targetAddr string
2013 onlyH1 bool
2014 }
2015
2016 func (cm *connectMethod) key() connectMethodKey {
2017 proxyStr := ""
2018 targetAddr := cm.targetAddr
2019 if cm.proxyURL != nil {
2020 proxyStr = cm.proxyURL.String()
2021 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2022 targetAddr = ""
2023 }
2024 }
2025 return connectMethodKey{
2026 proxy: proxyStr,
2027 scheme: cm.targetScheme,
2028 addr: targetAddr,
2029 onlyH1: cm.onlyH1,
2030 }
2031 }
2032
2033
2034 func (cm *connectMethod) scheme() string {
2035 if cm.proxyURL != nil {
2036 return cm.proxyURL.Scheme
2037 }
2038 return cm.targetScheme
2039 }
2040
2041
2042 func (cm *connectMethod) addr() string {
2043 if cm.proxyURL != nil {
2044 return canonicalAddr(cm.proxyURL)
2045 }
2046 return cm.targetAddr
2047 }
2048
2049
2050
2051 func (cm *connectMethod) tlsHost() string {
2052 h := cm.targetAddr
2053 if hasPort(h) {
2054 h = h[:strings.LastIndex(h, ":")]
2055 }
2056 return h
2057 }
2058
2059
2060
2061
2062 type connectMethodKey struct {
2063 proxy, scheme, addr string
2064 onlyH1 bool
2065 }
2066
2067 func (k connectMethodKey) String() string {
2068
2069 var h1 string
2070 if k.onlyH1 {
2071 h1 = ",h1"
2072 }
2073 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2074 }
2075
2076
2077
2078 type persistConn struct {
2079
2080
2081
2082 alt RoundTripper
2083
2084 t *Transport
2085 cacheKey connectMethodKey
2086 conn net.Conn
2087 tlsState *tls.ConnectionState
2088 br *bufio.Reader
2089 bw *bufio.Writer
2090 nwrite int64
2091 reqch chan requestAndChan
2092 writech chan writeRequest
2093 closech chan struct{}
2094 isProxy bool
2095 sawEOF bool
2096 readLimit int64
2097
2098
2099
2100
2101 writeErrCh chan error
2102
2103 writeLoopDone chan struct{}
2104
2105
2106 idleAt time.Time
2107 idleTimer *time.Timer
2108
2109 mu sync.Mutex
2110 numExpectedResponses int
2111 closed error
2112 canceledErr error
2113 broken bool
2114 reused bool
2115
2116
2117
2118 mutateHeaderFunc func(Header)
2119 }
2120
2121 func (pc *persistConn) maxHeaderResponseSize() int64 {
2122 return pc.t.maxHeaderResponseSize()
2123 }
2124
2125 func (pc *persistConn) Read(p []byte) (n int, err error) {
2126 if pc.readLimit <= 0 {
2127 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2128 }
2129 if int64(len(p)) > pc.readLimit {
2130 p = p[:pc.readLimit]
2131 }
2132 n, err = pc.conn.Read(p)
2133 if err == io.EOF {
2134 pc.sawEOF = true
2135 }
2136 pc.readLimit -= int64(n)
2137 return
2138 }
2139
2140
2141 func (pc *persistConn) isBroken() bool {
2142 pc.mu.Lock()
2143 b := pc.closed != nil
2144 pc.mu.Unlock()
2145 return b
2146 }
2147
2148
2149
2150 func (pc *persistConn) canceled() error {
2151 pc.mu.Lock()
2152 defer pc.mu.Unlock()
2153 return pc.canceledErr
2154 }
2155
2156
2157 func (pc *persistConn) isReused() bool {
2158 pc.mu.Lock()
2159 r := pc.reused
2160 pc.mu.Unlock()
2161 return r
2162 }
2163
2164 func (pc *persistConn) cancelRequest(err error) {
2165 pc.mu.Lock()
2166 defer pc.mu.Unlock()
2167 pc.canceledErr = err
2168 pc.closeLocked(errRequestCanceled)
2169 }
2170
2171
2172
2173
2174 func (pc *persistConn) closeConnIfStillIdle() {
2175 t := pc.t
2176 t.idleMu.Lock()
2177 defer t.idleMu.Unlock()
2178 if _, ok := t.idleLRU.m[pc]; !ok {
2179
2180 return
2181 }
2182 t.removeIdleConnLocked(pc)
2183 pc.close(errIdleConnTimeout)
2184 }
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2195 if err == nil {
2196 return nil
2197 }
2198
2199
2200
2201
2202
2203
2204
2205
2206 <-pc.writeLoopDone
2207
2208
2209
2210
2211 if cerr := pc.canceled(); cerr != nil {
2212 return cerr
2213 }
2214
2215
2216 req.mu.Lock()
2217 reqErr := req.err
2218 req.mu.Unlock()
2219 if reqErr != nil {
2220 return reqErr
2221 }
2222
2223 if err == errServerClosedIdle {
2224
2225 return err
2226 }
2227
2228 if _, ok := err.(transportReadFromServerError); ok {
2229 if pc.nwrite == startBytesWritten {
2230 return nothingWrittenError{err}
2231 }
2232
2233 return err
2234 }
2235 if pc.isBroken() {
2236 if pc.nwrite == startBytesWritten {
2237 return nothingWrittenError{err}
2238 }
2239 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2240 }
2241 return err
2242 }
2243
2244
2245
2246
2247 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2248
2249 func (pc *persistConn) readLoop() {
2250 closeErr := errReadLoopExiting
2251 defer func() {
2252 pc.close(closeErr)
2253 pc.t.removeIdleConn(pc)
2254 }()
2255
2256 tryPutIdleConn := func(treq *transportRequest) bool {
2257 trace := treq.trace
2258 if err := pc.t.tryPutIdleConn(pc); err != nil {
2259 closeErr = err
2260 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2261 trace.PutIdleConn(err)
2262 }
2263 return false
2264 }
2265 if trace != nil && trace.PutIdleConn != nil {
2266 trace.PutIdleConn(nil)
2267 }
2268 return true
2269 }
2270
2271
2272
2273
2274 eofc := make(chan struct{})
2275 defer close(eofc)
2276
2277
2278 testHookMu.Lock()
2279 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2280 testHookMu.Unlock()
2281
2282 alive := true
2283 for alive {
2284 pc.readLimit = pc.maxHeaderResponseSize()
2285 _, err := pc.br.Peek(1)
2286
2287 pc.mu.Lock()
2288 if pc.numExpectedResponses == 0 {
2289 pc.readLoopPeekFailLocked(err)
2290 pc.mu.Unlock()
2291 return
2292 }
2293 pc.mu.Unlock()
2294
2295 rc := <-pc.reqch
2296 trace := rc.treq.trace
2297
2298 var resp *Response
2299 if err == nil {
2300 resp, err = pc.readResponse(rc, trace)
2301 } else {
2302 err = transportReadFromServerError{err}
2303 closeErr = err
2304 }
2305
2306 if err != nil {
2307 if pc.readLimit <= 0 {
2308 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2309 }
2310
2311 select {
2312 case rc.ch <- responseAndError{err: err}:
2313 case <-rc.callerGone:
2314 return
2315 }
2316 return
2317 }
2318 pc.readLimit = maxInt64
2319
2320 pc.mu.Lock()
2321 pc.numExpectedResponses--
2322 pc.mu.Unlock()
2323
2324 bodyWritable := resp.bodyIsWritable()
2325 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2326
2327 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2328
2329
2330
2331 alive = false
2332 }
2333
2334 if !hasBody || bodyWritable {
2335
2336
2337
2338
2339
2340 alive = alive &&
2341 !pc.sawEOF &&
2342 pc.wroteRequest() &&
2343 tryPutIdleConn(rc.treq)
2344
2345 if bodyWritable {
2346 closeErr = errCallerOwnsConn
2347 }
2348
2349 select {
2350 case rc.ch <- responseAndError{res: resp}:
2351 case <-rc.callerGone:
2352 return
2353 }
2354
2355 rc.treq.cancel(errRequestDone)
2356
2357
2358
2359
2360 testHookReadLoopBeforeNextRead()
2361 continue
2362 }
2363
2364 waitForBodyRead := make(chan bool, 2)
2365 body := &bodyEOFSignal{
2366 body: resp.Body,
2367 earlyCloseFn: func() error {
2368 waitForBodyRead <- false
2369 <-eofc
2370 return nil
2371
2372 },
2373 fn: func(err error) error {
2374 isEOF := err == io.EOF
2375 waitForBodyRead <- isEOF
2376 if isEOF {
2377 <-eofc
2378 } else if err != nil {
2379 if cerr := pc.canceled(); cerr != nil {
2380 return cerr
2381 }
2382 }
2383 return err
2384 },
2385 }
2386
2387 resp.Body = body
2388 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2389 resp.Body = &gzipReader{body: body}
2390 resp.Header.Del("Content-Encoding")
2391 resp.Header.Del("Content-Length")
2392 resp.ContentLength = -1
2393 resp.Uncompressed = true
2394 }
2395
2396 select {
2397 case rc.ch <- responseAndError{res: resp}:
2398 case <-rc.callerGone:
2399 return
2400 }
2401
2402
2403
2404
2405 select {
2406 case bodyEOF := <-waitForBodyRead:
2407 alive = alive &&
2408 bodyEOF &&
2409 !pc.sawEOF &&
2410 pc.wroteRequest() &&
2411 tryPutIdleConn(rc.treq)
2412 if bodyEOF {
2413 eofc <- struct{}{}
2414 }
2415 case <-rc.treq.ctx.Done():
2416 alive = false
2417 pc.cancelRequest(context.Cause(rc.treq.ctx))
2418 case <-pc.closech:
2419 alive = false
2420 }
2421
2422 rc.treq.cancel(errRequestDone)
2423 testHookReadLoopBeforeNextRead()
2424 }
2425 }
2426
2427 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2428 if pc.closed != nil {
2429 return
2430 }
2431 if n := pc.br.Buffered(); n > 0 {
2432 buf, _ := pc.br.Peek(n)
2433 if is408Message(buf) {
2434 pc.closeLocked(errServerClosedIdle)
2435 return
2436 } else {
2437 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2438 }
2439 }
2440 if peekErr == io.EOF {
2441
2442 pc.closeLocked(errServerClosedIdle)
2443 } else {
2444 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2445 }
2446 }
2447
2448
2449
2450
2451 func is408Message(buf []byte) bool {
2452 if len(buf) < len("HTTP/1.x 408") {
2453 return false
2454 }
2455 if string(buf[:7]) != "HTTP/1." {
2456 return false
2457 }
2458 return string(buf[8:12]) == " 408"
2459 }
2460
2461
2462
2463
2464 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2465 if trace != nil && trace.GotFirstResponseByte != nil {
2466 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2467 trace.GotFirstResponseByte()
2468 }
2469 }
2470
2471 continueCh := rc.continueCh
2472 for {
2473 resp, err = ReadResponse(pc.br, rc.treq.Request)
2474 if err != nil {
2475 return
2476 }
2477 resCode := resp.StatusCode
2478 if continueCh != nil && resCode == StatusContinue {
2479 if trace != nil && trace.Got100Continue != nil {
2480 trace.Got100Continue()
2481 }
2482 continueCh <- struct{}{}
2483 continueCh = nil
2484 }
2485 is1xx := 100 <= resCode && resCode <= 199
2486
2487 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2488 if is1xxNonTerminal {
2489 if trace != nil && trace.Got1xxResponse != nil {
2490 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2491 return nil, err
2492 }
2493
2494
2495
2496
2497
2498
2499
2500 pc.readLimit = pc.maxHeaderResponseSize()
2501 }
2502 continue
2503 }
2504 break
2505 }
2506 if resp.isProtocolSwitch() {
2507 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2508 }
2509 if continueCh != nil {
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522 if resp.Close || rc.treq.Request.Close {
2523 close(continueCh)
2524 } else {
2525 continueCh <- struct{}{}
2526 }
2527 }
2528
2529 resp.TLS = pc.tlsState
2530 return
2531 }
2532
2533
2534
2535
2536 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2537 if continueCh == nil {
2538 return nil
2539 }
2540 return func() bool {
2541 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2542 defer timer.Stop()
2543
2544 select {
2545 case _, ok := <-continueCh:
2546 return ok
2547 case <-timer.C:
2548 return true
2549 case <-pc.closech:
2550 return false
2551 }
2552 }
2553 }
2554
2555 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2556 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2557 if br.Buffered() != 0 {
2558 body.br = br
2559 }
2560 return body
2561 }
2562
2563
2564
2565
2566
2567
2568 type readWriteCloserBody struct {
2569 _ incomparable
2570 br *bufio.Reader
2571 io.ReadWriteCloser
2572 }
2573
2574 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2575 if b.br != nil {
2576 if n := b.br.Buffered(); len(p) > n {
2577 p = p[:n]
2578 }
2579 n, err = b.br.Read(p)
2580 if b.br.Buffered() == 0 {
2581 b.br = nil
2582 }
2583 return n, err
2584 }
2585 return b.ReadWriteCloser.Read(p)
2586 }
2587
2588 func (b *readWriteCloserBody) CloseWrite() error {
2589 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2590 return cw.CloseWrite()
2591 }
2592 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2593 }
2594
2595
2596 type nothingWrittenError struct {
2597 error
2598 }
2599
2600 func (nwe nothingWrittenError) Unwrap() error {
2601 return nwe.error
2602 }
2603
2604 func (pc *persistConn) writeLoop() {
2605 defer close(pc.writeLoopDone)
2606 for {
2607 select {
2608 case wr := <-pc.writech:
2609 startBytesWritten := pc.nwrite
2610 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2611 if bre, ok := err.(requestBodyReadError); ok {
2612 err = bre.error
2613
2614
2615
2616
2617
2618
2619
2620 wr.req.setError(err)
2621 }
2622 if err == nil {
2623 err = pc.bw.Flush()
2624 }
2625 if err != nil {
2626 if pc.nwrite == startBytesWritten {
2627 err = nothingWrittenError{err}
2628 }
2629 }
2630 pc.writeErrCh <- err
2631 wr.ch <- err
2632 if err != nil {
2633 pc.close(err)
2634 return
2635 }
2636 case <-pc.closech:
2637 return
2638 }
2639 }
2640 }
2641
2642
2643
2644
2645
2646
2647
2648 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2649
2650
2651
2652 func (pc *persistConn) wroteRequest() bool {
2653 select {
2654 case err := <-pc.writeErrCh:
2655
2656
2657 return err == nil
2658 default:
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2670 defer t.Stop()
2671 select {
2672 case err := <-pc.writeErrCh:
2673 return err == nil
2674 case <-t.C:
2675 return false
2676 }
2677 }
2678 }
2679
2680
2681
2682 type responseAndError struct {
2683 _ incomparable
2684 res *Response
2685 err error
2686 }
2687
2688 type requestAndChan struct {
2689 _ incomparable
2690 treq *transportRequest
2691 ch chan responseAndError
2692
2693
2694
2695
2696 addedGzip bool
2697
2698
2699
2700
2701
2702 continueCh chan<- struct{}
2703
2704 callerGone <-chan struct{}
2705 }
2706
2707
2708
2709
2710
2711 type writeRequest struct {
2712 req *transportRequest
2713 ch chan<- error
2714
2715
2716
2717
2718 continueCh <-chan struct{}
2719 }
2720
2721
2722
2723 type timeoutError struct {
2724 err string
2725 }
2726
2727 func (e *timeoutError) Error() string { return e.err }
2728 func (e *timeoutError) Timeout() bool { return true }
2729 func (e *timeoutError) Temporary() bool { return true }
2730 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2731
2732 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2733
2734
2735
2736 var errRequestCanceled = http2errRequestCanceled
2737 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2738
2739
2740
2741 var errRequestDone = errors.New("net/http: request completed")
2742
2743 func nop() {}
2744
2745
2746 var (
2747 testHookEnterRoundTrip = nop
2748 testHookWaitResLoop = nop
2749 testHookRoundTripRetried = nop
2750 testHookPrePendingDial = nop
2751 testHookPostPendingDial = nop
2752
2753 testHookMu sync.Locker = fakeLocker{}
2754 testHookReadLoopBeforeNextRead = nop
2755 )
2756
2757 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2758 testHookEnterRoundTrip()
2759 pc.mu.Lock()
2760 pc.numExpectedResponses++
2761 headerFn := pc.mutateHeaderFunc
2762 pc.mu.Unlock()
2763
2764 if headerFn != nil {
2765 headerFn(req.extraHeaders())
2766 }
2767
2768
2769
2770
2771
2772 requestedGzip := false
2773 if !pc.t.DisableCompression &&
2774 req.Header.Get("Accept-Encoding") == "" &&
2775 req.Header.Get("Range") == "" &&
2776 req.Method != "HEAD" {
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789 requestedGzip = true
2790 req.extraHeaders().Set("Accept-Encoding", "gzip")
2791 }
2792
2793 var continueCh chan struct{}
2794 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2795 continueCh = make(chan struct{}, 1)
2796 }
2797
2798 if pc.t.DisableKeepAlives &&
2799 !req.wantsClose() &&
2800 !isProtocolSwitchHeader(req.Header) {
2801 req.extraHeaders().Set("Connection", "close")
2802 }
2803
2804 gone := make(chan struct{})
2805 defer close(gone)
2806
2807 const debugRoundTrip = false
2808
2809
2810
2811
2812 startBytesWritten := pc.nwrite
2813 writeErrCh := make(chan error, 1)
2814 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2815
2816 resc := make(chan responseAndError)
2817 pc.reqch <- requestAndChan{
2818 treq: req,
2819 ch: resc,
2820 addedGzip: requestedGzip,
2821 continueCh: continueCh,
2822 callerGone: gone,
2823 }
2824
2825 handleResponse := func(re responseAndError) (*Response, error) {
2826 if (re.res == nil) == (re.err == nil) {
2827 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2828 }
2829 if debugRoundTrip {
2830 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2831 }
2832 if re.err != nil {
2833 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2834 }
2835 return re.res, nil
2836 }
2837
2838 var respHeaderTimer <-chan time.Time
2839 ctxDoneChan := req.ctx.Done()
2840 pcClosed := pc.closech
2841 for {
2842 testHookWaitResLoop()
2843 select {
2844 case err := <-writeErrCh:
2845 if debugRoundTrip {
2846 req.logf("writeErrCh recv: %T/%#v", err, err)
2847 }
2848 if err != nil {
2849 pc.close(fmt.Errorf("write error: %w", err))
2850 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2851 }
2852 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2853 if debugRoundTrip {
2854 req.logf("starting timer for %v", d)
2855 }
2856 timer := time.NewTimer(d)
2857 defer timer.Stop()
2858 respHeaderTimer = timer.C
2859 }
2860 case <-pcClosed:
2861 select {
2862 case re := <-resc:
2863
2864
2865
2866 return handleResponse(re)
2867 default:
2868 }
2869 if debugRoundTrip {
2870 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2871 }
2872 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2873 case <-respHeaderTimer:
2874 if debugRoundTrip {
2875 req.logf("timeout waiting for response headers.")
2876 }
2877 pc.close(errTimeout)
2878 return nil, errTimeout
2879 case re := <-resc:
2880 return handleResponse(re)
2881 case <-ctxDoneChan:
2882 select {
2883 case re := <-resc:
2884
2885
2886
2887 return handleResponse(re)
2888 default:
2889 }
2890 pc.cancelRequest(context.Cause(req.ctx))
2891 }
2892 }
2893 }
2894
2895
2896
2897 type tLogKey struct{}
2898
2899 func (tr *transportRequest) logf(format string, args ...any) {
2900 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2901 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2902 }
2903 }
2904
2905
2906
2907 func (pc *persistConn) markReused() {
2908 pc.mu.Lock()
2909 pc.reused = true
2910 pc.mu.Unlock()
2911 }
2912
2913
2914
2915
2916
2917
2918 func (pc *persistConn) close(err error) {
2919 pc.mu.Lock()
2920 defer pc.mu.Unlock()
2921 pc.closeLocked(err)
2922 }
2923
2924 func (pc *persistConn) closeLocked(err error) {
2925 if err == nil {
2926 panic("nil error")
2927 }
2928 pc.broken = true
2929 if pc.closed == nil {
2930 pc.closed = err
2931 pc.t.decConnsPerHost(pc.cacheKey)
2932
2933
2934 if pc.alt == nil {
2935 if err != errCallerOwnsConn {
2936 pc.conn.Close()
2937 }
2938 close(pc.closech)
2939 }
2940 }
2941 pc.mutateHeaderFunc = nil
2942 }
2943
2944 func schemePort(scheme string) string {
2945 switch scheme {
2946 case "http":
2947 return "80"
2948 case "https":
2949 return "443"
2950 case "socks5", "socks5h":
2951 return "1080"
2952 default:
2953 return ""
2954 }
2955 }
2956
2957 func idnaASCIIFromURL(url *url.URL) string {
2958 addr := url.Hostname()
2959 if v, err := idnaASCII(addr); err == nil {
2960 addr = v
2961 }
2962 return addr
2963 }
2964
2965
2966 func canonicalAddr(url *url.URL) string {
2967 port := url.Port()
2968 if port == "" {
2969 port = schemePort(url.Scheme)
2970 }
2971 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2972 }
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985 type bodyEOFSignal struct {
2986 body io.ReadCloser
2987 mu sync.Mutex
2988 closed bool
2989 rerr error
2990 fn func(error) error
2991 earlyCloseFn func() error
2992 }
2993
2994 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2995 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
2996
2997 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2998 es.mu.Lock()
2999 closed, rerr := es.closed, es.rerr
3000 es.mu.Unlock()
3001 if closed {
3002 return 0, errReadOnClosedResBody
3003 }
3004 if rerr != nil {
3005 return 0, rerr
3006 }
3007
3008 n, err = es.body.Read(p)
3009 if err != nil {
3010 es.mu.Lock()
3011 defer es.mu.Unlock()
3012 if es.rerr == nil {
3013 es.rerr = err
3014 }
3015 err = es.condfn(err)
3016 }
3017 return
3018 }
3019
3020 func (es *bodyEOFSignal) Close() error {
3021 es.mu.Lock()
3022 defer es.mu.Unlock()
3023 if es.closed {
3024 return nil
3025 }
3026 es.closed = true
3027 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3028 return es.earlyCloseFn()
3029 }
3030 err := es.body.Close()
3031 return es.condfn(err)
3032 }
3033
3034
3035 func (es *bodyEOFSignal) condfn(err error) error {
3036 if es.fn == nil {
3037 return err
3038 }
3039 err = es.fn(err)
3040 es.fn = nil
3041 return err
3042 }
3043
3044
3045
3046
3047
3048 type gzipReader struct {
3049 _ incomparable
3050 body *bodyEOFSignal
3051 mu sync.Mutex
3052 zr *gzip.Reader
3053 zerr error
3054 }
3055
3056 type eofReader struct{}
3057
3058 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3059 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3060
3061 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3062
3063
3064 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3065 zr := gzipPool.Get().(*gzip.Reader)
3066 if err := zr.Reset(r); err != nil {
3067 gzipPoolPut(zr)
3068 return nil, err
3069 }
3070 return zr, nil
3071 }
3072
3073
3074 func gzipPoolPut(zr *gzip.Reader) {
3075
3076
3077 var r flate.Reader = eofReader{}
3078 zr.Reset(r)
3079 gzipPool.Put(zr)
3080 }
3081
3082
3083
3084 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3085 gz.mu.Lock()
3086 defer gz.mu.Unlock()
3087 if gz.zerr != nil {
3088 return nil, gz.zerr
3089 }
3090 if gz.zr == nil {
3091 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3092 if gz.zerr != nil {
3093 return nil, gz.zerr
3094 }
3095 }
3096 ret := gz.zr
3097 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3098 return ret, nil
3099 }
3100
3101
3102 func (gz *gzipReader) release(zr *gzip.Reader) {
3103 gz.mu.Lock()
3104 defer gz.mu.Unlock()
3105 if gz.zerr == errConcurrentReadOnResBody {
3106 gz.zr, gz.zerr = zr, nil
3107 } else {
3108 gzipPoolPut(zr)
3109 }
3110 }
3111
3112
3113
3114 func (gz *gzipReader) close() {
3115 gz.mu.Lock()
3116 defer gz.mu.Unlock()
3117 if gz.zerr == nil && gz.zr != nil {
3118 gzipPoolPut(gz.zr)
3119 gz.zr = nil
3120 }
3121 gz.zerr = errReadOnClosedResBody
3122 }
3123
3124 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3125 zr, err := gz.acquire()
3126 if err != nil {
3127 return 0, err
3128 }
3129 defer gz.release(zr)
3130
3131 return zr.Read(p)
3132 }
3133
3134 func (gz *gzipReader) Close() error {
3135 gz.close()
3136
3137 return gz.body.Close()
3138 }
3139
3140 type tlsHandshakeTimeoutError struct{}
3141
3142 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3143 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3144 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3145
3146
3147
3148
3149 type fakeLocker struct{}
3150
3151 func (fakeLocker) Lock() {}
3152 func (fakeLocker) Unlock() {}
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3168 if cfg == nil {
3169 return &tls.Config{}
3170 }
3171 return cfg.Clone()
3172 }
3173
3174 type connLRU struct {
3175 ll *list.List
3176 m map[*persistConn]*list.Element
3177 }
3178
3179
3180 func (cl *connLRU) add(pc *persistConn) {
3181 if cl.ll == nil {
3182 cl.ll = list.New()
3183 cl.m = make(map[*persistConn]*list.Element)
3184 }
3185 ele := cl.ll.PushFront(pc)
3186 if _, ok := cl.m[pc]; ok {
3187 panic("persistConn was already in LRU")
3188 }
3189 cl.m[pc] = ele
3190 }
3191
3192 func (cl *connLRU) removeOldest() *persistConn {
3193 ele := cl.ll.Back()
3194 pc := ele.Value.(*persistConn)
3195 cl.ll.Remove(ele)
3196 delete(cl.m, pc)
3197 return pc
3198 }
3199
3200
3201 func (cl *connLRU) remove(pc *persistConn) {
3202 if ele, ok := cl.m[pc]; ok {
3203 cl.ll.Remove(ele)
3204 delete(cl.m, pc)
3205 }
3206 }
3207
3208
3209 func (cl *connLRU) len() int {
3210 return len(cl.m)
3211 }
3212
View as plain text