Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "net"
24 "net/http/httptrace"
25 "net/http/internal/ascii"
26 "net/textproto"
27 "net/url"
28 "reflect"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "golang.org/x/net/http/httpguts"
35 "golang.org/x/net/http/httpproxy"
36 )
37
38
39
40
41
42
43 var DefaultTransport RoundTripper = &Transport{
44 Proxy: ProxyFromEnvironment,
45 DialContext: defaultTransportDialContext(&net.Dialer{
46 Timeout: 30 * time.Second,
47 KeepAlive: 30 * time.Second,
48 }),
49 ForceAttemptHTTP2: true,
50 MaxIdleConns: 100,
51 IdleConnTimeout: 90 * time.Second,
52 TLSHandshakeTimeout: 10 * time.Second,
53 ExpectContinueTimeout: 1 * time.Second,
54 }
55
56
57
58 const DefaultMaxIdleConnsPerHost = 2
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 type Transport struct {
96 idleMu sync.Mutex
97 closeIdle bool
98 idleConn map[connectMethodKey][]*persistConn
99 idleConnWait map[connectMethodKey]wantConnQueue
100 idleLRU connLRU
101
102 reqMu sync.Mutex
103 reqCanceler map[cancelKey]func(error)
104
105 altMu sync.Mutex
106 altProto atomic.Value
107
108 connsPerHostMu sync.Mutex
109 connsPerHost map[connectMethodKey]int
110 connsPerHostWait map[connectMethodKey]wantConnQueue
111 dialsInProgress wantConnQueue
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 Proxy func(*Request) (*url.URL, error)
128
129
130
131
132 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
133
134
135
136
137
138
139
140
141
142 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
143
144
145
146
147
148
149
150
151
152
153
154 Dial func(network, addr string) (net.Conn, error)
155
156
157
158
159
160
161
162
163
164
165
166 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
167
168
169
170
171
172
173
174 DialTLS func(network, addr string) (net.Conn, error)
175
176
177
178
179
180 TLSClientConfig *tls.Config
181
182
183
184 TLSHandshakeTimeout time.Duration
185
186
187
188
189
190
191 DisableKeepAlives bool
192
193
194
195
196
197
198
199
200
201 DisableCompression bool
202
203
204
205 MaxIdleConns int
206
207
208
209
210 MaxIdleConnsPerHost int
211
212
213
214
215
216
217 MaxConnsPerHost int
218
219
220
221
222
223 IdleConnTimeout time.Duration
224
225
226
227
228
229 ResponseHeaderTimeout time.Duration
230
231
232
233
234
235
236
237
238 ExpectContinueTimeout time.Duration
239
240
241
242
243
244
245
246
247
248
249
250 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
251
252
253
254
255 ProxyConnectHeader Header
256
257
258
259
260
261
262
263
264 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
265
266
267
268
269
270
271 MaxResponseHeaderBytes int64
272
273
274
275
276 WriteBufferSize int
277
278
279
280
281 ReadBufferSize int
282
283
284
285 nextProtoOnce sync.Once
286 h2transport h2Transport
287 tlsNextProtoWasNil bool
288
289
290
291
292
293
294 ForceAttemptHTTP2 bool
295 }
296
297
298
299
300 type cancelKey struct {
301 req *Request
302 }
303
304 func (t *Transport) writeBufferSize() int {
305 if t.WriteBufferSize > 0 {
306 return t.WriteBufferSize
307 }
308 return 4 << 10
309 }
310
311 func (t *Transport) readBufferSize() int {
312 if t.ReadBufferSize > 0 {
313 return t.ReadBufferSize
314 }
315 return 4 << 10
316 }
317
318
319 func (t *Transport) Clone() *Transport {
320 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
321 t2 := &Transport{
322 Proxy: t.Proxy,
323 OnProxyConnectResponse: t.OnProxyConnectResponse,
324 DialContext: t.DialContext,
325 Dial: t.Dial,
326 DialTLS: t.DialTLS,
327 DialTLSContext: t.DialTLSContext,
328 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
329 DisableKeepAlives: t.DisableKeepAlives,
330 DisableCompression: t.DisableCompression,
331 MaxIdleConns: t.MaxIdleConns,
332 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
333 MaxConnsPerHost: t.MaxConnsPerHost,
334 IdleConnTimeout: t.IdleConnTimeout,
335 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
336 ExpectContinueTimeout: t.ExpectContinueTimeout,
337 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
338 GetProxyConnectHeader: t.GetProxyConnectHeader,
339 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
340 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
341 WriteBufferSize: t.WriteBufferSize,
342 ReadBufferSize: t.ReadBufferSize,
343 }
344 if t.TLSClientConfig != nil {
345 t2.TLSClientConfig = t.TLSClientConfig.Clone()
346 }
347 if !t.tlsNextProtoWasNil {
348 npm := map[string]func(authority string, c *tls.Conn) RoundTripper{}
349 for k, v := range t.TLSNextProto {
350 npm[k] = v
351 }
352 t2.TLSNextProto = npm
353 }
354 return t2
355 }
356
357
358
359
360
361
362
363 type h2Transport interface {
364 CloseIdleConnections()
365 }
366
367 func (t *Transport) hasCustomTLSDialer() bool {
368 return t.DialTLS != nil || t.DialTLSContext != nil
369 }
370
371 var http2client = godebug.New("http2client")
372
373
374
375 func (t *Transport) onceSetNextProtoDefaults() {
376 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
377 if http2client.Value() == "0" {
378 http2client.IncNonDefault()
379 return
380 }
381
382
383
384
385
386
387 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
388 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
389 if v := rv.Field(0); v.CanInterface() {
390 if h2i, ok := v.Interface().(h2Transport); ok {
391 t.h2transport = h2i
392 return
393 }
394 }
395 }
396
397 if t.TLSNextProto != nil {
398
399
400 return
401 }
402 if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()) {
403
404
405
406
407
408
409 return
410 }
411 if omitBundledHTTP2 {
412 return
413 }
414 t2, err := http2configureTransports(t)
415 if err != nil {
416 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
417 return
418 }
419 t.h2transport = t2
420
421
422
423
424
425
426
427 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
428 const h2max = 1<<32 - 1
429 if limit1 >= h2max {
430 t2.MaxHeaderListSize = h2max
431 } else {
432 t2.MaxHeaderListSize = uint32(limit1)
433 }
434 }
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
454 return envProxyFunc()(req.URL)
455 }
456
457
458
459 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
460 return func(*Request) (*url.URL, error) {
461 return fixedURL, nil
462 }
463 }
464
465
466
467
468 type transportRequest struct {
469 *Request
470 extra Header
471 trace *httptrace.ClientTrace
472 cancelKey cancelKey
473
474 mu sync.Mutex
475 err error
476 }
477
478 func (tr *transportRequest) extraHeaders() Header {
479 if tr.extra == nil {
480 tr.extra = make(Header)
481 }
482 return tr.extra
483 }
484
485 func (tr *transportRequest) setError(err error) {
486 tr.mu.Lock()
487 if tr.err == nil {
488 tr.err = err
489 }
490 tr.mu.Unlock()
491 }
492
493
494
495 func (t *Transport) useRegisteredProtocol(req *Request) bool {
496 if req.URL.Scheme == "https" && req.requiresHTTP1() {
497
498
499
500
501 return false
502 }
503 return true
504 }
505
506
507
508
509 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
510 if !t.useRegisteredProtocol(req) {
511 return nil
512 }
513 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
514 return altProto[req.URL.Scheme]
515 }
516
517 func validateHeaders(hdrs Header) string {
518 for k, vv := range hdrs {
519 if !httpguts.ValidHeaderFieldName(k) {
520 return fmt.Sprintf("field name %q", k)
521 }
522 for _, v := range vv {
523 if !httpguts.ValidHeaderFieldValue(v) {
524
525
526 return fmt.Sprintf("field value for %q", k)
527 }
528 }
529 }
530 return ""
531 }
532
533
534 func (t *Transport) roundTrip(req *Request) (*Response, error) {
535 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
536 ctx := req.Context()
537 trace := httptrace.ContextClientTrace(ctx)
538
539 if req.URL == nil {
540 req.closeBody()
541 return nil, errors.New("http: nil Request.URL")
542 }
543 if req.Header == nil {
544 req.closeBody()
545 return nil, errors.New("http: nil Request.Header")
546 }
547 scheme := req.URL.Scheme
548 isHTTP := scheme == "http" || scheme == "https"
549 if isHTTP {
550
551 if err := validateHeaders(req.Header); err != "" {
552 req.closeBody()
553 return nil, fmt.Errorf("net/http: invalid header %s", err)
554 }
555
556
557 if err := validateHeaders(req.Trailer); err != "" {
558 req.closeBody()
559 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
560 }
561 }
562
563 origReq := req
564 cancelKey := cancelKey{origReq}
565 req = setupRewindBody(req)
566
567 if altRT := t.alternateRoundTripper(req); altRT != nil {
568 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
569 return resp, err
570 }
571 var err error
572 req, err = rewindBody(req)
573 if err != nil {
574 return nil, err
575 }
576 }
577 if !isHTTP {
578 req.closeBody()
579 return nil, badStringError("unsupported protocol scheme", scheme)
580 }
581 if req.Method != "" && !validMethod(req.Method) {
582 req.closeBody()
583 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
584 }
585 if req.URL.Host == "" {
586 req.closeBody()
587 return nil, errors.New("http: no Host in request URL")
588 }
589
590 for {
591 select {
592 case <-ctx.Done():
593 req.closeBody()
594 return nil, ctx.Err()
595 default:
596 }
597
598
599 treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
600 cm, err := t.connectMethodForRequest(treq)
601 if err != nil {
602 req.closeBody()
603 return nil, err
604 }
605
606
607
608
609
610 pconn, err := t.getConn(treq, cm)
611 if err != nil {
612 t.setReqCanceler(cancelKey, nil)
613 req.closeBody()
614 return nil, err
615 }
616
617 var resp *Response
618 if pconn.alt != nil {
619
620 t.setReqCanceler(cancelKey, nil)
621 resp, err = pconn.alt.RoundTrip(req)
622 } else {
623 resp, err = pconn.roundTrip(treq)
624 }
625 if err == nil {
626 resp.Request = origReq
627 return resp, nil
628 }
629
630
631 if http2isNoCachedConnError(err) {
632 if t.removeIdleConn(pconn) {
633 t.decConnsPerHost(pconn.cacheKey)
634 }
635 } else if !pconn.shouldRetryRequest(req, err) {
636
637
638 if e, ok := err.(nothingWrittenError); ok {
639 err = e.error
640 }
641 if e, ok := err.(transportReadFromServerError); ok {
642 err = e.err
643 }
644 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
645
646
647
648 req.closeBody()
649 }
650 return nil, err
651 }
652 testHookRoundTripRetried()
653
654
655 req, err = rewindBody(req)
656 if err != nil {
657 return nil, err
658 }
659 }
660 }
661
662 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
663
664 type readTrackingBody struct {
665 io.ReadCloser
666 didRead bool
667 didClose bool
668 }
669
670 func (r *readTrackingBody) Read(data []byte) (int, error) {
671 r.didRead = true
672 return r.ReadCloser.Read(data)
673 }
674
675 func (r *readTrackingBody) Close() error {
676 r.didClose = true
677 return r.ReadCloser.Close()
678 }
679
680
681
682
683
684 func setupRewindBody(req *Request) *Request {
685 if req.Body == nil || req.Body == NoBody {
686 return req
687 }
688 newReq := *req
689 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
690 return &newReq
691 }
692
693
694
695
696
697 func rewindBody(req *Request) (rewound *Request, err error) {
698 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
699 return req, nil
700 }
701 if !req.Body.(*readTrackingBody).didClose {
702 req.closeBody()
703 }
704 if req.GetBody == nil {
705 return nil, errCannotRewind
706 }
707 body, err := req.GetBody()
708 if err != nil {
709 return nil, err
710 }
711 newReq := *req
712 newReq.Body = &readTrackingBody{ReadCloser: body}
713 return &newReq, nil
714 }
715
716
717
718
719 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
720 if http2isNoCachedConnError(err) {
721
722
723
724
725
726
727 return true
728 }
729 if err == errMissingHost {
730
731 return false
732 }
733 if !pc.isReused() {
734
735
736
737
738
739
740
741 return false
742 }
743 if _, ok := err.(nothingWrittenError); ok {
744
745
746 return req.outgoingLength() == 0 || req.GetBody != nil
747 }
748 if !req.isReplayable() {
749
750 return false
751 }
752 if _, ok := err.(transportReadFromServerError); ok {
753
754
755 return true
756 }
757 if err == errServerClosedIdle {
758
759
760
761 return true
762 }
763 return false
764 }
765
766
767 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
768
769
770
771
772
773
774
775
776
777
778
779 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
780 t.altMu.Lock()
781 defer t.altMu.Unlock()
782 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
783 if _, exists := oldMap[scheme]; exists {
784 panic("protocol " + scheme + " already registered")
785 }
786 newMap := make(map[string]RoundTripper)
787 for k, v := range oldMap {
788 newMap[k] = v
789 }
790 newMap[scheme] = rt
791 t.altProto.Store(newMap)
792 }
793
794
795
796
797
798 func (t *Transport) CloseIdleConnections() {
799 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
800 t.idleMu.Lock()
801 m := t.idleConn
802 t.idleConn = nil
803 t.closeIdle = true
804 t.idleLRU = connLRU{}
805 t.idleMu.Unlock()
806 for _, conns := range m {
807 for _, pconn := range conns {
808 pconn.close(errCloseIdleConns)
809 }
810 }
811 t.connsPerHostMu.Lock()
812 t.dialsInProgress.all(func(w *wantConn) {
813 if w.cancelCtx != nil && !w.waiting() {
814 w.cancelCtx()
815 }
816 })
817 t.connsPerHostMu.Unlock()
818 if t2 := t.h2transport; t2 != nil {
819 t2.CloseIdleConnections()
820 }
821 }
822
823
824
825
826
827
828
829 func (t *Transport) CancelRequest(req *Request) {
830 t.cancelRequest(cancelKey{req}, errRequestCanceled)
831 }
832
833
834
835 func (t *Transport) cancelRequest(key cancelKey, err error) bool {
836
837
838 t.reqMu.Lock()
839 defer t.reqMu.Unlock()
840 cancel := t.reqCanceler[key]
841 delete(t.reqCanceler, key)
842 if cancel != nil {
843 cancel(err)
844 }
845
846 return cancel != nil
847 }
848
849
850
851
852
853 var (
854 envProxyOnce sync.Once
855 envProxyFuncValue func(*url.URL) (*url.URL, error)
856 )
857
858
859
860 func envProxyFunc() func(*url.URL) (*url.URL, error) {
861 envProxyOnce.Do(func() {
862 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
863 })
864 return envProxyFuncValue
865 }
866
867
868 func resetProxyConfig() {
869 envProxyOnce = sync.Once{}
870 envProxyFuncValue = nil
871 }
872
873 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
874 cm.targetScheme = treq.URL.Scheme
875 cm.targetAddr = canonicalAddr(treq.URL)
876 if t.Proxy != nil {
877 cm.proxyURL, err = t.Proxy(treq.Request)
878 }
879 cm.onlyH1 = treq.requiresHTTP1()
880 return cm, err
881 }
882
883
884
885 func (cm *connectMethod) proxyAuth() string {
886 if cm.proxyURL == nil {
887 return ""
888 }
889 if u := cm.proxyURL.User; u != nil {
890 username := u.Username()
891 password, _ := u.Password()
892 return "Basic " + basicAuth(username, password)
893 }
894 return ""
895 }
896
897
898 var (
899 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
900 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
901 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
902 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
903 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
904 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
905 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
906 errIdleConnTimeout = errors.New("http: idle connection timeout")
907
908
909
910
911
912 errServerClosedIdle = errors.New("http: server closed idle connection")
913 )
914
915
916
917
918
919
920
921
922
923 type transportReadFromServerError struct {
924 err error
925 }
926
927 func (e transportReadFromServerError) Unwrap() error { return e.err }
928
929 func (e transportReadFromServerError) Error() string {
930 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
931 }
932
933 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
934 if err := t.tryPutIdleConn(pconn); err != nil {
935 pconn.close(err)
936 }
937 }
938
939 func (t *Transport) maxIdleConnsPerHost() int {
940 if v := t.MaxIdleConnsPerHost; v != 0 {
941 return v
942 }
943 return DefaultMaxIdleConnsPerHost
944 }
945
946
947
948
949
950
951 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
952 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
953 return errKeepAlivesDisabled
954 }
955 if pconn.isBroken() {
956 return errConnBroken
957 }
958 pconn.markReused()
959
960 t.idleMu.Lock()
961 defer t.idleMu.Unlock()
962
963
964
965
966 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
967 return nil
968 }
969
970
971
972
973
974 key := pconn.cacheKey
975 if q, ok := t.idleConnWait[key]; ok {
976 done := false
977 if pconn.alt == nil {
978
979
980 for q.len() > 0 {
981 w := q.popFront()
982 if w.tryDeliver(pconn, nil, time.Time{}) {
983 done = true
984 break
985 }
986 }
987 } else {
988
989
990
991
992 for q.len() > 0 {
993 w := q.popFront()
994 w.tryDeliver(pconn, nil, time.Time{})
995 }
996 }
997 if q.len() == 0 {
998 delete(t.idleConnWait, key)
999 } else {
1000 t.idleConnWait[key] = q
1001 }
1002 if done {
1003 return nil
1004 }
1005 }
1006
1007 if t.closeIdle {
1008 return errCloseIdle
1009 }
1010 if t.idleConn == nil {
1011 t.idleConn = make(map[connectMethodKey][]*persistConn)
1012 }
1013 idles := t.idleConn[key]
1014 if len(idles) >= t.maxIdleConnsPerHost() {
1015 return errTooManyIdleHost
1016 }
1017 for _, exist := range idles {
1018 if exist == pconn {
1019 log.Fatalf("dup idle pconn %p in freelist", pconn)
1020 }
1021 }
1022 t.idleConn[key] = append(idles, pconn)
1023 t.idleLRU.add(pconn)
1024 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1025 oldest := t.idleLRU.removeOldest()
1026 oldest.close(errTooManyIdle)
1027 t.removeIdleConnLocked(oldest)
1028 }
1029
1030
1031
1032
1033 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1034 if pconn.idleTimer != nil {
1035 pconn.idleTimer.Reset(t.IdleConnTimeout)
1036 } else {
1037 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1038 }
1039 }
1040 pconn.idleAt = time.Now()
1041 return nil
1042 }
1043
1044
1045
1046
1047 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1048 if t.DisableKeepAlives {
1049 return false
1050 }
1051
1052 t.idleMu.Lock()
1053 defer t.idleMu.Unlock()
1054
1055
1056
1057 t.closeIdle = false
1058
1059 if w == nil {
1060
1061 return false
1062 }
1063
1064
1065
1066
1067 var oldTime time.Time
1068 if t.IdleConnTimeout > 0 {
1069 oldTime = time.Now().Add(-t.IdleConnTimeout)
1070 }
1071
1072
1073 if list, ok := t.idleConn[w.key]; ok {
1074 stop := false
1075 delivered := false
1076 for len(list) > 0 && !stop {
1077 pconn := list[len(list)-1]
1078
1079
1080
1081
1082 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1083 if tooOld {
1084
1085
1086
1087 go pconn.closeConnIfStillIdle()
1088 }
1089 if pconn.isBroken() || tooOld {
1090
1091
1092
1093
1094
1095 list = list[:len(list)-1]
1096 continue
1097 }
1098 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1099 if delivered {
1100 if pconn.alt != nil {
1101
1102
1103 } else {
1104
1105
1106 t.idleLRU.remove(pconn)
1107 list = list[:len(list)-1]
1108 }
1109 }
1110 stop = true
1111 }
1112 if len(list) > 0 {
1113 t.idleConn[w.key] = list
1114 } else {
1115 delete(t.idleConn, w.key)
1116 }
1117 if stop {
1118 return delivered
1119 }
1120 }
1121
1122
1123 if t.idleConnWait == nil {
1124 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1125 }
1126 q := t.idleConnWait[w.key]
1127 q.cleanFrontNotWaiting()
1128 q.pushBack(w)
1129 t.idleConnWait[w.key] = q
1130 return false
1131 }
1132
1133
1134 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1135 t.idleMu.Lock()
1136 defer t.idleMu.Unlock()
1137 return t.removeIdleConnLocked(pconn)
1138 }
1139
1140
1141 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1142 if pconn.idleTimer != nil {
1143 pconn.idleTimer.Stop()
1144 }
1145 t.idleLRU.remove(pconn)
1146 key := pconn.cacheKey
1147 pconns := t.idleConn[key]
1148 var removed bool
1149 switch len(pconns) {
1150 case 0:
1151
1152 case 1:
1153 if pconns[0] == pconn {
1154 delete(t.idleConn, key)
1155 removed = true
1156 }
1157 default:
1158 for i, v := range pconns {
1159 if v != pconn {
1160 continue
1161 }
1162
1163
1164 copy(pconns[i:], pconns[i+1:])
1165 t.idleConn[key] = pconns[:len(pconns)-1]
1166 removed = true
1167 break
1168 }
1169 }
1170 return removed
1171 }
1172
1173 func (t *Transport) setReqCanceler(key cancelKey, fn func(error)) {
1174 t.reqMu.Lock()
1175 defer t.reqMu.Unlock()
1176 if t.reqCanceler == nil {
1177 t.reqCanceler = make(map[cancelKey]func(error))
1178 }
1179 if fn != nil {
1180 t.reqCanceler[key] = fn
1181 } else {
1182 delete(t.reqCanceler, key)
1183 }
1184 }
1185
1186
1187
1188
1189
1190 func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool {
1191 t.reqMu.Lock()
1192 defer t.reqMu.Unlock()
1193 _, ok := t.reqCanceler[key]
1194 if !ok {
1195 return false
1196 }
1197 if fn != nil {
1198 t.reqCanceler[key] = fn
1199 } else {
1200 delete(t.reqCanceler, key)
1201 }
1202 return true
1203 }
1204
1205 var zeroDialer net.Dialer
1206
1207 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1208 if t.DialContext != nil {
1209 c, err := t.DialContext(ctx, network, addr)
1210 if c == nil && err == nil {
1211 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1212 }
1213 return c, err
1214 }
1215 if t.Dial != nil {
1216 c, err := t.Dial(network, addr)
1217 if c == nil && err == nil {
1218 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1219 }
1220 return c, err
1221 }
1222 return zeroDialer.DialContext(ctx, network, addr)
1223 }
1224
1225
1226
1227
1228
1229
1230
1231 type wantConn struct {
1232 cm connectMethod
1233 key connectMethodKey
1234
1235
1236
1237
1238 beforeDial func()
1239 afterDial func()
1240
1241 mu sync.Mutex
1242 ctx context.Context
1243 cancelCtx context.CancelFunc
1244 done bool
1245 result chan connOrError
1246 }
1247
1248 type connOrError struct {
1249 pc *persistConn
1250 err error
1251 idleAt time.Time
1252 }
1253
1254
1255 func (w *wantConn) waiting() bool {
1256 w.mu.Lock()
1257 defer w.mu.Unlock()
1258
1259 return !w.done
1260 }
1261
1262
1263 func (w *wantConn) getCtxForDial() context.Context {
1264 w.mu.Lock()
1265 defer w.mu.Unlock()
1266
1267 return w.ctx
1268 }
1269
1270
1271 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1272 w.mu.Lock()
1273 defer w.mu.Unlock()
1274
1275 if w.done {
1276 return false
1277 }
1278 if (pc == nil) == (err == nil) {
1279 panic("net/http: internal error: misuse of tryDeliver")
1280 }
1281 w.ctx = nil
1282 w.done = true
1283
1284 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1285 close(w.result)
1286
1287 return true
1288 }
1289
1290
1291
1292 func (w *wantConn) cancel(t *Transport, err error) {
1293 w.mu.Lock()
1294 var pc *persistConn
1295 if w.done {
1296 if r, ok := <-w.result; ok {
1297 pc = r.pc
1298 }
1299 } else {
1300 close(w.result)
1301 }
1302 w.ctx = nil
1303 w.done = true
1304 w.mu.Unlock()
1305
1306 if pc != nil {
1307 t.putOrCloseIdleConn(pc)
1308 }
1309 }
1310
1311
1312 type wantConnQueue struct {
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323 head []*wantConn
1324 headPos int
1325 tail []*wantConn
1326 }
1327
1328
1329 func (q *wantConnQueue) len() int {
1330 return len(q.head) - q.headPos + len(q.tail)
1331 }
1332
1333
1334 func (q *wantConnQueue) pushBack(w *wantConn) {
1335 q.tail = append(q.tail, w)
1336 }
1337
1338
1339 func (q *wantConnQueue) popFront() *wantConn {
1340 if q.headPos >= len(q.head) {
1341 if len(q.tail) == 0 {
1342 return nil
1343 }
1344
1345 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1346 }
1347 w := q.head[q.headPos]
1348 q.head[q.headPos] = nil
1349 q.headPos++
1350 return w
1351 }
1352
1353
1354 func (q *wantConnQueue) peekFront() *wantConn {
1355 if q.headPos < len(q.head) {
1356 return q.head[q.headPos]
1357 }
1358 if len(q.tail) > 0 {
1359 return q.tail[0]
1360 }
1361 return nil
1362 }
1363
1364
1365
1366 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1367 for {
1368 w := q.peekFront()
1369 if w == nil || w.waiting() {
1370 return cleaned
1371 }
1372 q.popFront()
1373 cleaned = true
1374 }
1375 }
1376
1377
1378 func (q *wantConnQueue) cleanFrontCanceled() {
1379 for {
1380 w := q.peekFront()
1381 if w == nil || w.cancelCtx != nil {
1382 return
1383 }
1384 q.popFront()
1385 }
1386 }
1387
1388
1389
1390 func (q *wantConnQueue) all(f func(*wantConn)) {
1391 for _, w := range q.head[q.headPos:] {
1392 f(w)
1393 }
1394 for _, w := range q.tail {
1395 f(w)
1396 }
1397 }
1398
1399 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1400 if t.DialTLSContext != nil {
1401 conn, err = t.DialTLSContext(ctx, network, addr)
1402 } else {
1403 conn, err = t.DialTLS(network, addr)
1404 }
1405 if conn == nil && err == nil {
1406 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1407 }
1408 return
1409 }
1410
1411
1412
1413
1414
1415 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1416 req := treq.Request
1417 trace := treq.trace
1418 ctx := req.Context()
1419 if trace != nil && trace.GetConn != nil {
1420 trace.GetConn(cm.addr())
1421 }
1422
1423
1424
1425
1426
1427
1428 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1429
1430 w := &wantConn{
1431 cm: cm,
1432 key: cm.key(),
1433 ctx: dialCtx,
1434 cancelCtx: dialCancel,
1435 result: make(chan connOrError, 1),
1436 beforeDial: testHookPrePendingDial,
1437 afterDial: testHookPostPendingDial,
1438 }
1439 defer func() {
1440 if err != nil {
1441 w.cancel(t, err)
1442 }
1443 }()
1444
1445 var cancelc chan error
1446
1447
1448 if delivered := t.queueForIdleConn(w); delivered {
1449
1450
1451
1452 t.setReqCanceler(treq.cancelKey, func(error) {})
1453 } else {
1454 cancelc = make(chan error, 1)
1455 t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
1456
1457
1458 t.queueForDial(w)
1459 }
1460
1461
1462 select {
1463 case r := <-w.result:
1464
1465
1466 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1467 info := httptrace.GotConnInfo{
1468 Conn: r.pc.conn,
1469 Reused: r.pc.isReused(),
1470 }
1471 if !r.idleAt.IsZero() {
1472 info.WasIdle = true
1473 info.IdleTime = time.Since(r.idleAt)
1474 }
1475 trace.GotConn(info)
1476 }
1477 if r.err != nil {
1478
1479
1480
1481 select {
1482 case <-req.Cancel:
1483 return nil, errRequestCanceledConn
1484 case <-req.Context().Done():
1485 return nil, req.Context().Err()
1486 case err := <-cancelc:
1487 if err == errRequestCanceled {
1488 err = errRequestCanceledConn
1489 }
1490 return nil, err
1491 default:
1492
1493 }
1494 }
1495 return r.pc, r.err
1496 case <-req.Cancel:
1497 return nil, errRequestCanceledConn
1498 case <-req.Context().Done():
1499 return nil, req.Context().Err()
1500 case err := <-cancelc:
1501 if err == errRequestCanceled {
1502 err = errRequestCanceledConn
1503 }
1504 return nil, err
1505 }
1506 }
1507
1508
1509
1510 func (t *Transport) queueForDial(w *wantConn) {
1511 w.beforeDial()
1512
1513 t.connsPerHostMu.Lock()
1514 defer t.connsPerHostMu.Unlock()
1515
1516 if t.MaxConnsPerHost <= 0 {
1517 t.startDialConnForLocked(w)
1518 return
1519 }
1520
1521 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1522 if t.connsPerHost == nil {
1523 t.connsPerHost = make(map[connectMethodKey]int)
1524 }
1525 t.connsPerHost[w.key] = n + 1
1526 t.startDialConnForLocked(w)
1527 return
1528 }
1529
1530 if t.connsPerHostWait == nil {
1531 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1532 }
1533 q := t.connsPerHostWait[w.key]
1534 q.cleanFrontNotWaiting()
1535 q.pushBack(w)
1536 t.connsPerHostWait[w.key] = q
1537 }
1538
1539
1540
1541 func (t *Transport) startDialConnForLocked(w *wantConn) {
1542 t.dialsInProgress.cleanFrontCanceled()
1543 t.dialsInProgress.pushBack(w)
1544 go func() {
1545 t.dialConnFor(w)
1546 t.connsPerHostMu.Lock()
1547 defer t.connsPerHostMu.Unlock()
1548 w.cancelCtx = nil
1549 }()
1550 }
1551
1552
1553
1554
1555 func (t *Transport) dialConnFor(w *wantConn) {
1556 defer w.afterDial()
1557 ctx := w.getCtxForDial()
1558 if ctx == nil {
1559 t.decConnsPerHost(w.key)
1560 return
1561 }
1562
1563 pc, err := t.dialConn(ctx, w.cm)
1564 delivered := w.tryDeliver(pc, err, time.Time{})
1565 if err == nil && (!delivered || pc.alt != nil) {
1566
1567
1568
1569 t.putOrCloseIdleConn(pc)
1570 }
1571 if err != nil {
1572 t.decConnsPerHost(w.key)
1573 }
1574 }
1575
1576
1577
1578 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1579 if t.MaxConnsPerHost <= 0 {
1580 return
1581 }
1582
1583 t.connsPerHostMu.Lock()
1584 defer t.connsPerHostMu.Unlock()
1585 n := t.connsPerHost[key]
1586 if n == 0 {
1587
1588
1589 panic("net/http: internal error: connCount underflow")
1590 }
1591
1592
1593
1594
1595
1596 if q := t.connsPerHostWait[key]; q.len() > 0 {
1597 done := false
1598 for q.len() > 0 {
1599 w := q.popFront()
1600 if w.waiting() {
1601 t.startDialConnForLocked(w)
1602 done = true
1603 break
1604 }
1605 }
1606 if q.len() == 0 {
1607 delete(t.connsPerHostWait, key)
1608 } else {
1609
1610
1611 t.connsPerHostWait[key] = q
1612 }
1613 if done {
1614 return
1615 }
1616 }
1617
1618
1619 if n--; n == 0 {
1620 delete(t.connsPerHost, key)
1621 } else {
1622 t.connsPerHost[key] = n
1623 }
1624 }
1625
1626
1627
1628
1629 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1630
1631 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1632 if cfg.ServerName == "" {
1633 cfg.ServerName = name
1634 }
1635 if pconn.cacheKey.onlyH1 {
1636 cfg.NextProtos = nil
1637 }
1638 plainConn := pconn.conn
1639 tlsConn := tls.Client(plainConn, cfg)
1640 errc := make(chan error, 2)
1641 var timer *time.Timer
1642 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1643 timer = time.AfterFunc(d, func() {
1644 errc <- tlsHandshakeTimeoutError{}
1645 })
1646 }
1647 go func() {
1648 if trace != nil && trace.TLSHandshakeStart != nil {
1649 trace.TLSHandshakeStart()
1650 }
1651 err := tlsConn.HandshakeContext(ctx)
1652 if timer != nil {
1653 timer.Stop()
1654 }
1655 errc <- err
1656 }()
1657 if err := <-errc; err != nil {
1658 plainConn.Close()
1659 if err == (tlsHandshakeTimeoutError{}) {
1660
1661
1662 <-errc
1663 }
1664 if trace != nil && trace.TLSHandshakeDone != nil {
1665 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1666 }
1667 return err
1668 }
1669 cs := tlsConn.ConnectionState()
1670 if trace != nil && trace.TLSHandshakeDone != nil {
1671 trace.TLSHandshakeDone(cs, nil)
1672 }
1673 pconn.tlsState = &cs
1674 pconn.conn = tlsConn
1675 return nil
1676 }
1677
1678 type erringRoundTripper interface {
1679 RoundTripErr() error
1680 }
1681
1682 var testHookProxyConnectTimeout = context.WithTimeout
1683
1684 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1685 pconn = &persistConn{
1686 t: t,
1687 cacheKey: cm.key(),
1688 reqch: make(chan requestAndChan, 1),
1689 writech: make(chan writeRequest, 1),
1690 closech: make(chan struct{}),
1691 writeErrCh: make(chan error, 1),
1692 writeLoopDone: make(chan struct{}),
1693 }
1694 trace := httptrace.ContextClientTrace(ctx)
1695 wrapErr := func(err error) error {
1696 if cm.proxyURL != nil {
1697
1698 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1699 }
1700 return err
1701 }
1702 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1703 var err error
1704 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1705 if err != nil {
1706 return nil, wrapErr(err)
1707 }
1708 if tc, ok := pconn.conn.(*tls.Conn); ok {
1709
1710
1711 if trace != nil && trace.TLSHandshakeStart != nil {
1712 trace.TLSHandshakeStart()
1713 }
1714 if err := tc.HandshakeContext(ctx); err != nil {
1715 go pconn.conn.Close()
1716 if trace != nil && trace.TLSHandshakeDone != nil {
1717 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1718 }
1719 return nil, err
1720 }
1721 cs := tc.ConnectionState()
1722 if trace != nil && trace.TLSHandshakeDone != nil {
1723 trace.TLSHandshakeDone(cs, nil)
1724 }
1725 pconn.tlsState = &cs
1726 }
1727 } else {
1728 conn, err := t.dial(ctx, "tcp", cm.addr())
1729 if err != nil {
1730 return nil, wrapErr(err)
1731 }
1732 pconn.conn = conn
1733 if cm.scheme() == "https" {
1734 var firstTLSHost string
1735 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1736 return nil, wrapErr(err)
1737 }
1738 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1739 return nil, wrapErr(err)
1740 }
1741 }
1742 }
1743
1744
1745 switch {
1746 case cm.proxyURL == nil:
1747
1748 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1749 conn := pconn.conn
1750 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1751 if u := cm.proxyURL.User; u != nil {
1752 auth := &socksUsernamePassword{
1753 Username: u.Username(),
1754 }
1755 auth.Password, _ = u.Password()
1756 d.AuthMethods = []socksAuthMethod{
1757 socksAuthMethodNotRequired,
1758 socksAuthMethodUsernamePassword,
1759 }
1760 d.Authenticate = auth.Authenticate
1761 }
1762 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1763 conn.Close()
1764 return nil, err
1765 }
1766 case cm.targetScheme == "http":
1767 pconn.isProxy = true
1768 if pa := cm.proxyAuth(); pa != "" {
1769 pconn.mutateHeaderFunc = func(h Header) {
1770 h.Set("Proxy-Authorization", pa)
1771 }
1772 }
1773 case cm.targetScheme == "https":
1774 conn := pconn.conn
1775 var hdr Header
1776 if t.GetProxyConnectHeader != nil {
1777 var err error
1778 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1779 if err != nil {
1780 conn.Close()
1781 return nil, err
1782 }
1783 } else {
1784 hdr = t.ProxyConnectHeader
1785 }
1786 if hdr == nil {
1787 hdr = make(Header)
1788 }
1789 if pa := cm.proxyAuth(); pa != "" {
1790 hdr = hdr.Clone()
1791 hdr.Set("Proxy-Authorization", pa)
1792 }
1793 connectReq := &Request{
1794 Method: "CONNECT",
1795 URL: &url.URL{Opaque: cm.targetAddr},
1796 Host: cm.targetAddr,
1797 Header: hdr,
1798 }
1799
1800
1801
1802
1803 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1804 defer cancel()
1805
1806 didReadResponse := make(chan struct{})
1807 var (
1808 resp *Response
1809 err error
1810 )
1811
1812 go func() {
1813 defer close(didReadResponse)
1814 err = connectReq.Write(conn)
1815 if err != nil {
1816 return
1817 }
1818
1819
1820 br := bufio.NewReader(conn)
1821 resp, err = ReadResponse(br, connectReq)
1822 }()
1823 select {
1824 case <-connectCtx.Done():
1825 conn.Close()
1826 <-didReadResponse
1827 return nil, connectCtx.Err()
1828 case <-didReadResponse:
1829
1830 }
1831 if err != nil {
1832 conn.Close()
1833 return nil, err
1834 }
1835
1836 if t.OnProxyConnectResponse != nil {
1837 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1838 if err != nil {
1839 conn.Close()
1840 return nil, err
1841 }
1842 }
1843
1844 if resp.StatusCode != 200 {
1845 _, text, ok := strings.Cut(resp.Status, " ")
1846 conn.Close()
1847 if !ok {
1848 return nil, errors.New("unknown status code")
1849 }
1850 return nil, errors.New(text)
1851 }
1852 }
1853
1854 if cm.proxyURL != nil && cm.targetScheme == "https" {
1855 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1856 return nil, err
1857 }
1858 }
1859
1860 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1861 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1862 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1863 if e, ok := alt.(erringRoundTripper); ok {
1864
1865 return nil, e.RoundTripErr()
1866 }
1867 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1868 }
1869 }
1870
1871 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1872 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1873
1874 go pconn.readLoop()
1875 go pconn.writeLoop()
1876 return pconn, nil
1877 }
1878
1879
1880
1881
1882
1883
1884
1885 type persistConnWriter struct {
1886 pc *persistConn
1887 }
1888
1889 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1890 n, err = w.pc.conn.Write(p)
1891 w.pc.nwrite += int64(n)
1892 return
1893 }
1894
1895
1896
1897
1898 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1899 n, err = io.Copy(w.pc.conn, r)
1900 w.pc.nwrite += n
1901 return
1902 }
1903
1904 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922 type connectMethod struct {
1923 _ incomparable
1924 proxyURL *url.URL
1925 targetScheme string
1926
1927
1928
1929 targetAddr string
1930 onlyH1 bool
1931 }
1932
1933 func (cm *connectMethod) key() connectMethodKey {
1934 proxyStr := ""
1935 targetAddr := cm.targetAddr
1936 if cm.proxyURL != nil {
1937 proxyStr = cm.proxyURL.String()
1938 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1939 targetAddr = ""
1940 }
1941 }
1942 return connectMethodKey{
1943 proxy: proxyStr,
1944 scheme: cm.targetScheme,
1945 addr: targetAddr,
1946 onlyH1: cm.onlyH1,
1947 }
1948 }
1949
1950
1951 func (cm *connectMethod) scheme() string {
1952 if cm.proxyURL != nil {
1953 return cm.proxyURL.Scheme
1954 }
1955 return cm.targetScheme
1956 }
1957
1958
1959 func (cm *connectMethod) addr() string {
1960 if cm.proxyURL != nil {
1961 return canonicalAddr(cm.proxyURL)
1962 }
1963 return cm.targetAddr
1964 }
1965
1966
1967
1968 func (cm *connectMethod) tlsHost() string {
1969 h := cm.targetAddr
1970 if hasPort(h) {
1971 h = h[:strings.LastIndex(h, ":")]
1972 }
1973 return h
1974 }
1975
1976
1977
1978
1979 type connectMethodKey struct {
1980 proxy, scheme, addr string
1981 onlyH1 bool
1982 }
1983
1984 func (k connectMethodKey) String() string {
1985
1986 var h1 string
1987 if k.onlyH1 {
1988 h1 = ",h1"
1989 }
1990 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
1991 }
1992
1993
1994
1995 type persistConn struct {
1996
1997
1998
1999 alt RoundTripper
2000
2001 t *Transport
2002 cacheKey connectMethodKey
2003 conn net.Conn
2004 tlsState *tls.ConnectionState
2005 br *bufio.Reader
2006 bw *bufio.Writer
2007 nwrite int64
2008 reqch chan requestAndChan
2009 writech chan writeRequest
2010 closech chan struct{}
2011 isProxy bool
2012 sawEOF bool
2013 readLimit int64
2014
2015
2016
2017
2018 writeErrCh chan error
2019
2020 writeLoopDone chan struct{}
2021
2022
2023 idleAt time.Time
2024 idleTimer *time.Timer
2025
2026 mu sync.Mutex
2027 numExpectedResponses int
2028 closed error
2029 canceledErr error
2030 broken bool
2031 reused bool
2032
2033
2034
2035 mutateHeaderFunc func(Header)
2036 }
2037
2038 func (pc *persistConn) maxHeaderResponseSize() int64 {
2039 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2040 return v
2041 }
2042 return 10 << 20
2043 }
2044
2045 func (pc *persistConn) Read(p []byte) (n int, err error) {
2046 if pc.readLimit <= 0 {
2047 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2048 }
2049 if int64(len(p)) > pc.readLimit {
2050 p = p[:pc.readLimit]
2051 }
2052 n, err = pc.conn.Read(p)
2053 if err == io.EOF {
2054 pc.sawEOF = true
2055 }
2056 pc.readLimit -= int64(n)
2057 return
2058 }
2059
2060
2061 func (pc *persistConn) isBroken() bool {
2062 pc.mu.Lock()
2063 b := pc.closed != nil
2064 pc.mu.Unlock()
2065 return b
2066 }
2067
2068
2069
2070 func (pc *persistConn) canceled() error {
2071 pc.mu.Lock()
2072 defer pc.mu.Unlock()
2073 return pc.canceledErr
2074 }
2075
2076
2077 func (pc *persistConn) isReused() bool {
2078 pc.mu.Lock()
2079 r := pc.reused
2080 pc.mu.Unlock()
2081 return r
2082 }
2083
2084 func (pc *persistConn) cancelRequest(err error) {
2085 pc.mu.Lock()
2086 defer pc.mu.Unlock()
2087 pc.canceledErr = err
2088 pc.closeLocked(errRequestCanceled)
2089 }
2090
2091
2092
2093
2094 func (pc *persistConn) closeConnIfStillIdle() {
2095 t := pc.t
2096 t.idleMu.Lock()
2097 defer t.idleMu.Unlock()
2098 if _, ok := t.idleLRU.m[pc]; !ok {
2099
2100 return
2101 }
2102 t.removeIdleConnLocked(pc)
2103 pc.close(errIdleConnTimeout)
2104 }
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2115 if err == nil {
2116 return nil
2117 }
2118
2119
2120
2121
2122
2123
2124
2125
2126 <-pc.writeLoopDone
2127
2128
2129
2130
2131 if cerr := pc.canceled(); cerr != nil {
2132 return cerr
2133 }
2134
2135
2136 req.mu.Lock()
2137 reqErr := req.err
2138 req.mu.Unlock()
2139 if reqErr != nil {
2140 return reqErr
2141 }
2142
2143 if err == errServerClosedIdle {
2144
2145 return err
2146 }
2147
2148 if _, ok := err.(transportReadFromServerError); ok {
2149 if pc.nwrite == startBytesWritten {
2150 return nothingWrittenError{err}
2151 }
2152
2153 return err
2154 }
2155 if pc.isBroken() {
2156 if pc.nwrite == startBytesWritten {
2157 return nothingWrittenError{err}
2158 }
2159 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2160 }
2161 return err
2162 }
2163
2164
2165
2166
2167 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2168
2169 func (pc *persistConn) readLoop() {
2170 closeErr := errReadLoopExiting
2171 defer func() {
2172 pc.close(closeErr)
2173 pc.t.removeIdleConn(pc)
2174 }()
2175
2176 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
2177 if err := pc.t.tryPutIdleConn(pc); err != nil {
2178 closeErr = err
2179 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2180 trace.PutIdleConn(err)
2181 }
2182 return false
2183 }
2184 if trace != nil && trace.PutIdleConn != nil {
2185 trace.PutIdleConn(nil)
2186 }
2187 return true
2188 }
2189
2190
2191
2192
2193 eofc := make(chan struct{})
2194 defer close(eofc)
2195
2196
2197 testHookMu.Lock()
2198 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2199 testHookMu.Unlock()
2200
2201 alive := true
2202 for alive {
2203 pc.readLimit = pc.maxHeaderResponseSize()
2204 _, err := pc.br.Peek(1)
2205
2206 pc.mu.Lock()
2207 if pc.numExpectedResponses == 0 {
2208 pc.readLoopPeekFailLocked(err)
2209 pc.mu.Unlock()
2210 return
2211 }
2212 pc.mu.Unlock()
2213
2214 rc := <-pc.reqch
2215 trace := httptrace.ContextClientTrace(rc.req.Context())
2216
2217 var resp *Response
2218 if err == nil {
2219 resp, err = pc.readResponse(rc, trace)
2220 } else {
2221 err = transportReadFromServerError{err}
2222 closeErr = err
2223 }
2224
2225 if err != nil {
2226 if pc.readLimit <= 0 {
2227 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2228 }
2229
2230 select {
2231 case rc.ch <- responseAndError{err: err}:
2232 case <-rc.callerGone:
2233 return
2234 }
2235 return
2236 }
2237 pc.readLimit = maxInt64
2238
2239 pc.mu.Lock()
2240 pc.numExpectedResponses--
2241 pc.mu.Unlock()
2242
2243 bodyWritable := resp.bodyIsWritable()
2244 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
2245
2246 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
2247
2248
2249
2250 alive = false
2251 }
2252
2253 if !hasBody || bodyWritable {
2254 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
2255
2256
2257
2258
2259
2260
2261 alive = alive &&
2262 !pc.sawEOF &&
2263 pc.wroteRequest() &&
2264 replaced && tryPutIdleConn(trace)
2265
2266 if bodyWritable {
2267 closeErr = errCallerOwnsConn
2268 }
2269
2270 select {
2271 case rc.ch <- responseAndError{res: resp}:
2272 case <-rc.callerGone:
2273 return
2274 }
2275
2276
2277
2278
2279 testHookReadLoopBeforeNextRead()
2280 continue
2281 }
2282
2283 waitForBodyRead := make(chan bool, 2)
2284 body := &bodyEOFSignal{
2285 body: resp.Body,
2286 earlyCloseFn: func() error {
2287 waitForBodyRead <- false
2288 <-eofc
2289 return nil
2290
2291 },
2292 fn: func(err error) error {
2293 isEOF := err == io.EOF
2294 waitForBodyRead <- isEOF
2295 if isEOF {
2296 <-eofc
2297 } else if err != nil {
2298 if cerr := pc.canceled(); cerr != nil {
2299 return cerr
2300 }
2301 }
2302 return err
2303 },
2304 }
2305
2306 resp.Body = body
2307 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2308 resp.Body = &gzipReader{body: body}
2309 resp.Header.Del("Content-Encoding")
2310 resp.Header.Del("Content-Length")
2311 resp.ContentLength = -1
2312 resp.Uncompressed = true
2313 }
2314
2315 select {
2316 case rc.ch <- responseAndError{res: resp}:
2317 case <-rc.callerGone:
2318 return
2319 }
2320
2321
2322
2323
2324 select {
2325 case bodyEOF := <-waitForBodyRead:
2326 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
2327 alive = alive &&
2328 bodyEOF &&
2329 !pc.sawEOF &&
2330 pc.wroteRequest() &&
2331 replaced && tryPutIdleConn(trace)
2332 if bodyEOF {
2333 eofc <- struct{}{}
2334 }
2335 case <-rc.req.Cancel:
2336 alive = false
2337 pc.t.cancelRequest(rc.cancelKey, errRequestCanceled)
2338 case <-rc.req.Context().Done():
2339 alive = false
2340 pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
2341 case <-pc.closech:
2342 alive = false
2343 pc.t.setReqCanceler(rc.cancelKey, nil)
2344 }
2345
2346 testHookReadLoopBeforeNextRead()
2347 }
2348 }
2349
2350 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2351 if pc.closed != nil {
2352 return
2353 }
2354 if n := pc.br.Buffered(); n > 0 {
2355 buf, _ := pc.br.Peek(n)
2356 if is408Message(buf) {
2357 pc.closeLocked(errServerClosedIdle)
2358 return
2359 } else {
2360 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2361 }
2362 }
2363 if peekErr == io.EOF {
2364
2365 pc.closeLocked(errServerClosedIdle)
2366 } else {
2367 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2368 }
2369 }
2370
2371
2372
2373
2374 func is408Message(buf []byte) bool {
2375 if len(buf) < len("HTTP/1.x 408") {
2376 return false
2377 }
2378 if string(buf[:7]) != "HTTP/1." {
2379 return false
2380 }
2381 return string(buf[8:12]) == " 408"
2382 }
2383
2384
2385
2386
2387 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2388 if trace != nil && trace.GotFirstResponseByte != nil {
2389 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2390 trace.GotFirstResponseByte()
2391 }
2392 }
2393 num1xx := 0
2394 const max1xxResponses = 5
2395
2396 continueCh := rc.continueCh
2397 for {
2398 resp, err = ReadResponse(pc.br, rc.req)
2399 if err != nil {
2400 return
2401 }
2402 resCode := resp.StatusCode
2403 if continueCh != nil {
2404 if resCode == 100 {
2405 if trace != nil && trace.Got100Continue != nil {
2406 trace.Got100Continue()
2407 }
2408 continueCh <- struct{}{}
2409 continueCh = nil
2410 } else if resCode >= 200 {
2411 close(continueCh)
2412 continueCh = nil
2413 }
2414 }
2415 is1xx := 100 <= resCode && resCode <= 199
2416
2417 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2418 if is1xxNonTerminal {
2419 num1xx++
2420 if num1xx > max1xxResponses {
2421 return nil, errors.New("net/http: too many 1xx informational responses")
2422 }
2423 pc.readLimit = pc.maxHeaderResponseSize()
2424 if trace != nil && trace.Got1xxResponse != nil {
2425 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2426 return nil, err
2427 }
2428 }
2429 continue
2430 }
2431 break
2432 }
2433 if resp.isProtocolSwitch() {
2434 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2435 }
2436
2437 resp.TLS = pc.tlsState
2438 return
2439 }
2440
2441
2442
2443
2444 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2445 if continueCh == nil {
2446 return nil
2447 }
2448 return func() bool {
2449 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2450 defer timer.Stop()
2451
2452 select {
2453 case _, ok := <-continueCh:
2454 return ok
2455 case <-timer.C:
2456 return true
2457 case <-pc.closech:
2458 return false
2459 }
2460 }
2461 }
2462
2463 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2464 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2465 if br.Buffered() != 0 {
2466 body.br = br
2467 }
2468 return body
2469 }
2470
2471
2472
2473
2474
2475
2476 type readWriteCloserBody struct {
2477 _ incomparable
2478 br *bufio.Reader
2479 io.ReadWriteCloser
2480 }
2481
2482 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2483 if b.br != nil {
2484 if n := b.br.Buffered(); len(p) > n {
2485 p = p[:n]
2486 }
2487 n, err = b.br.Read(p)
2488 if b.br.Buffered() == 0 {
2489 b.br = nil
2490 }
2491 return n, err
2492 }
2493 return b.ReadWriteCloser.Read(p)
2494 }
2495
2496
2497 type nothingWrittenError struct {
2498 error
2499 }
2500
2501 func (nwe nothingWrittenError) Unwrap() error {
2502 return nwe.error
2503 }
2504
2505 func (pc *persistConn) writeLoop() {
2506 defer close(pc.writeLoopDone)
2507 for {
2508 select {
2509 case wr := <-pc.writech:
2510 startBytesWritten := pc.nwrite
2511 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2512 if bre, ok := err.(requestBodyReadError); ok {
2513 err = bre.error
2514
2515
2516
2517
2518
2519
2520
2521 wr.req.setError(err)
2522 }
2523 if err == nil {
2524 err = pc.bw.Flush()
2525 }
2526 if err != nil {
2527 if pc.nwrite == startBytesWritten {
2528 err = nothingWrittenError{err}
2529 }
2530 }
2531 pc.writeErrCh <- err
2532 wr.ch <- err
2533 if err != nil {
2534 pc.close(err)
2535 return
2536 }
2537 case <-pc.closech:
2538 return
2539 }
2540 }
2541 }
2542
2543
2544
2545
2546
2547
2548
2549 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2550
2551
2552
2553 func (pc *persistConn) wroteRequest() bool {
2554 select {
2555 case err := <-pc.writeErrCh:
2556
2557
2558 return err == nil
2559 default:
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2571 defer t.Stop()
2572 select {
2573 case err := <-pc.writeErrCh:
2574 return err == nil
2575 case <-t.C:
2576 return false
2577 }
2578 }
2579 }
2580
2581
2582
2583 type responseAndError struct {
2584 _ incomparable
2585 res *Response
2586 err error
2587 }
2588
2589 type requestAndChan struct {
2590 _ incomparable
2591 req *Request
2592 cancelKey cancelKey
2593 ch chan responseAndError
2594
2595
2596
2597
2598 addedGzip bool
2599
2600
2601
2602
2603
2604 continueCh chan<- struct{}
2605
2606 callerGone <-chan struct{}
2607 }
2608
2609
2610
2611
2612
2613 type writeRequest struct {
2614 req *transportRequest
2615 ch chan<- error
2616
2617
2618
2619
2620 continueCh <-chan struct{}
2621 }
2622
2623
2624
2625 type timeoutError struct {
2626 err string
2627 }
2628
2629 func (e *timeoutError) Error() string { return e.err }
2630 func (e *timeoutError) Timeout() bool { return true }
2631 func (e *timeoutError) Temporary() bool { return true }
2632 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2633
2634 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2635
2636
2637
2638 var errRequestCanceled = http2errRequestCanceled
2639 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2640
2641 func nop() {}
2642
2643
2644 var (
2645 testHookEnterRoundTrip = nop
2646 testHookWaitResLoop = nop
2647 testHookRoundTripRetried = nop
2648 testHookPrePendingDial = nop
2649 testHookPostPendingDial = nop
2650
2651 testHookMu sync.Locker = fakeLocker{}
2652 testHookReadLoopBeforeNextRead = nop
2653 )
2654
2655 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2656 testHookEnterRoundTrip()
2657 if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
2658 pc.t.putOrCloseIdleConn(pc)
2659 return nil, errRequestCanceled
2660 }
2661 pc.mu.Lock()
2662 pc.numExpectedResponses++
2663 headerFn := pc.mutateHeaderFunc
2664 pc.mu.Unlock()
2665
2666 if headerFn != nil {
2667 headerFn(req.extraHeaders())
2668 }
2669
2670
2671
2672
2673
2674 requestedGzip := false
2675 if !pc.t.DisableCompression &&
2676 req.Header.Get("Accept-Encoding") == "" &&
2677 req.Header.Get("Range") == "" &&
2678 req.Method != "HEAD" {
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691 requestedGzip = true
2692 req.extraHeaders().Set("Accept-Encoding", "gzip")
2693 }
2694
2695 var continueCh chan struct{}
2696 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2697 continueCh = make(chan struct{}, 1)
2698 }
2699
2700 if pc.t.DisableKeepAlives &&
2701 !req.wantsClose() &&
2702 !isProtocolSwitchHeader(req.Header) {
2703 req.extraHeaders().Set("Connection", "close")
2704 }
2705
2706 gone := make(chan struct{})
2707 defer close(gone)
2708
2709 defer func() {
2710 if err != nil {
2711 pc.t.setReqCanceler(req.cancelKey, nil)
2712 }
2713 }()
2714
2715 const debugRoundTrip = false
2716
2717
2718
2719
2720 startBytesWritten := pc.nwrite
2721 writeErrCh := make(chan error, 1)
2722 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2723
2724 resc := make(chan responseAndError)
2725 pc.reqch <- requestAndChan{
2726 req: req.Request,
2727 cancelKey: req.cancelKey,
2728 ch: resc,
2729 addedGzip: requestedGzip,
2730 continueCh: continueCh,
2731 callerGone: gone,
2732 }
2733
2734 var respHeaderTimer <-chan time.Time
2735 cancelChan := req.Request.Cancel
2736 ctxDoneChan := req.Context().Done()
2737 pcClosed := pc.closech
2738 canceled := false
2739 for {
2740 testHookWaitResLoop()
2741 select {
2742 case err := <-writeErrCh:
2743 if debugRoundTrip {
2744 req.logf("writeErrCh recv: %T/%#v", err, err)
2745 }
2746 if err != nil {
2747 pc.close(fmt.Errorf("write error: %w", err))
2748 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2749 }
2750 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2751 if debugRoundTrip {
2752 req.logf("starting timer for %v", d)
2753 }
2754 timer := time.NewTimer(d)
2755 defer timer.Stop()
2756 respHeaderTimer = timer.C
2757 }
2758 case <-pcClosed:
2759 pcClosed = nil
2760 if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
2761 if debugRoundTrip {
2762 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2763 }
2764 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2765 }
2766 case <-respHeaderTimer:
2767 if debugRoundTrip {
2768 req.logf("timeout waiting for response headers.")
2769 }
2770 pc.close(errTimeout)
2771 return nil, errTimeout
2772 case re := <-resc:
2773 if (re.res == nil) == (re.err == nil) {
2774 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2775 }
2776 if debugRoundTrip {
2777 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2778 }
2779 if re.err != nil {
2780 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2781 }
2782 return re.res, nil
2783 case <-cancelChan:
2784 canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
2785 cancelChan = nil
2786 case <-ctxDoneChan:
2787 canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
2788 cancelChan = nil
2789 ctxDoneChan = nil
2790 }
2791 }
2792 }
2793
2794
2795
2796 type tLogKey struct{}
2797
2798 func (tr *transportRequest) logf(format string, args ...any) {
2799 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2800 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2801 }
2802 }
2803
2804
2805
2806 func (pc *persistConn) markReused() {
2807 pc.mu.Lock()
2808 pc.reused = true
2809 pc.mu.Unlock()
2810 }
2811
2812
2813
2814
2815
2816
2817 func (pc *persistConn) close(err error) {
2818 pc.mu.Lock()
2819 defer pc.mu.Unlock()
2820 pc.closeLocked(err)
2821 }
2822
2823 func (pc *persistConn) closeLocked(err error) {
2824 if err == nil {
2825 panic("nil error")
2826 }
2827 pc.broken = true
2828 if pc.closed == nil {
2829 pc.closed = err
2830 pc.t.decConnsPerHost(pc.cacheKey)
2831
2832
2833 if pc.alt == nil {
2834 if err != errCallerOwnsConn {
2835 pc.conn.Close()
2836 }
2837 close(pc.closech)
2838 }
2839 }
2840 pc.mutateHeaderFunc = nil
2841 }
2842
2843 var portMap = map[string]string{
2844 "http": "80",
2845 "https": "443",
2846 "socks5": "1080",
2847 "socks5h": "1080",
2848 }
2849
2850 func idnaASCIIFromURL(url *url.URL) string {
2851 addr := url.Hostname()
2852 if v, err := idnaASCII(addr); err == nil {
2853 addr = v
2854 }
2855 return addr
2856 }
2857
2858
2859 func canonicalAddr(url *url.URL) string {
2860 port := url.Port()
2861 if port == "" {
2862 port = portMap[url.Scheme]
2863 }
2864 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2865 }
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878 type bodyEOFSignal struct {
2879 body io.ReadCloser
2880 mu sync.Mutex
2881 closed bool
2882 rerr error
2883 fn func(error) error
2884 earlyCloseFn func() error
2885 }
2886
2887 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2888
2889 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2890 es.mu.Lock()
2891 closed, rerr := es.closed, es.rerr
2892 es.mu.Unlock()
2893 if closed {
2894 return 0, errReadOnClosedResBody
2895 }
2896 if rerr != nil {
2897 return 0, rerr
2898 }
2899
2900 n, err = es.body.Read(p)
2901 if err != nil {
2902 es.mu.Lock()
2903 defer es.mu.Unlock()
2904 if es.rerr == nil {
2905 es.rerr = err
2906 }
2907 err = es.condfn(err)
2908 }
2909 return
2910 }
2911
2912 func (es *bodyEOFSignal) Close() error {
2913 es.mu.Lock()
2914 defer es.mu.Unlock()
2915 if es.closed {
2916 return nil
2917 }
2918 es.closed = true
2919 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2920 return es.earlyCloseFn()
2921 }
2922 err := es.body.Close()
2923 return es.condfn(err)
2924 }
2925
2926
2927 func (es *bodyEOFSignal) condfn(err error) error {
2928 if es.fn == nil {
2929 return err
2930 }
2931 err = es.fn(err)
2932 es.fn = nil
2933 return err
2934 }
2935
2936
2937
2938 type gzipReader struct {
2939 _ incomparable
2940 body *bodyEOFSignal
2941 zr *gzip.Reader
2942 zerr error
2943 }
2944
2945 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2946 if gz.zr == nil {
2947 if gz.zerr == nil {
2948 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2949 }
2950 if gz.zerr != nil {
2951 return 0, gz.zerr
2952 }
2953 }
2954
2955 gz.body.mu.Lock()
2956 if gz.body.closed {
2957 err = errReadOnClosedResBody
2958 }
2959 gz.body.mu.Unlock()
2960
2961 if err != nil {
2962 return 0, err
2963 }
2964 return gz.zr.Read(p)
2965 }
2966
2967 func (gz *gzipReader) Close() error {
2968 return gz.body.Close()
2969 }
2970
2971 type tlsHandshakeTimeoutError struct{}
2972
2973 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2974 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2975 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2976
2977
2978
2979
2980 type fakeLocker struct{}
2981
2982 func (fakeLocker) Lock() {}
2983 func (fakeLocker) Unlock() {}
2984
2985
2986
2987
2988 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
2989 if cfg == nil {
2990 return &tls.Config{}
2991 }
2992 return cfg.Clone()
2993 }
2994
2995 type connLRU struct {
2996 ll *list.List
2997 m map[*persistConn]*list.Element
2998 }
2999
3000
3001 func (cl *connLRU) add(pc *persistConn) {
3002 if cl.ll == nil {
3003 cl.ll = list.New()
3004 cl.m = make(map[*persistConn]*list.Element)
3005 }
3006 ele := cl.ll.PushFront(pc)
3007 if _, ok := cl.m[pc]; ok {
3008 panic("persistConn was already in LRU")
3009 }
3010 cl.m[pc] = ele
3011 }
3012
3013 func (cl *connLRU) removeOldest() *persistConn {
3014 ele := cl.ll.Back()
3015 pc := ele.Value.(*persistConn)
3016 cl.ll.Remove(ele)
3017 delete(cl.m, pc)
3018 return pc
3019 }
3020
3021
3022 func (cl *connLRU) remove(pc *persistConn) {
3023 if ele, ok := cl.m[pc]; ok {
3024 cl.ll.Remove(ele)
3025 delete(cl.m, pc)
3026 }
3027 }
3028
3029
3030 func (cl *connLRU) len() int {
3031 return len(cl.m)
3032 }
3033
View as plain text