Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "maps"
24 "net"
25 "net/http/httptrace"
26 "net/http/internal/ascii"
27 "net/textproto"
28 "net/url"
29 "reflect"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34 _ "unsafe"
35
36 "golang.org/x/net/http/httpguts"
37 "golang.org/x/net/http/httpproxy"
38 )
39
40
41
42
43
44
45 var DefaultTransport RoundTripper = &Transport{
46 Proxy: ProxyFromEnvironment,
47 DialContext: defaultTransportDialContext(&net.Dialer{
48 Timeout: 30 * time.Second,
49 KeepAlive: 30 * time.Second,
50 }),
51 ForceAttemptHTTP2: true,
52 MaxIdleConns: 100,
53 IdleConnTimeout: 90 * time.Second,
54 TLSHandshakeTimeout: 10 * time.Second,
55 ExpectContinueTimeout: 1 * time.Second,
56 }
57
58
59
60 const DefaultMaxIdleConnsPerHost = 2
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 type Transport struct {
97 idleMu sync.Mutex
98 closeIdle bool
99 idleConn map[connectMethodKey][]*persistConn
100 idleConnWait map[connectMethodKey]wantConnQueue
101 idleLRU connLRU
102
103 reqMu sync.Mutex
104 reqCanceler map[*Request]context.CancelCauseFunc
105
106 altMu sync.Mutex
107 altProto atomic.Value
108
109 connsPerHostMu sync.Mutex
110 connsPerHost map[connectMethodKey]int
111 connsPerHostWait map[connectMethodKey]wantConnQueue
112 dialsInProgress wantConnQueue
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 Proxy func(*Request) (*url.URL, error)
129
130
131
132
133 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
134
135
136
137
138
139
140
141
142
143 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 Dial func(network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163
164
165
166
167 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
168
169
170
171
172
173
174
175 DialTLS func(network, addr string) (net.Conn, error)
176
177
178
179
180
181 TLSClientConfig *tls.Config
182
183
184
185 TLSHandshakeTimeout time.Duration
186
187
188
189
190
191
192 DisableKeepAlives bool
193
194
195
196
197
198
199
200
201
202 DisableCompression bool
203
204
205
206 MaxIdleConns int
207
208
209
210
211 MaxIdleConnsPerHost int
212
213
214
215
216
217
218 MaxConnsPerHost int
219
220
221
222
223
224 IdleConnTimeout time.Duration
225
226
227
228
229
230 ResponseHeaderTimeout time.Duration
231
232
233
234
235
236
237
238
239 ExpectContinueTimeout time.Duration
240
241
242
243
244
245
246
247
248
249
250
251 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
252
253
254
255
256 ProxyConnectHeader Header
257
258
259
260
261
262
263
264
265 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
266
267
268
269
270
271
272 MaxResponseHeaderBytes int64
273
274
275
276
277 WriteBufferSize int
278
279
280
281
282 ReadBufferSize int
283
284
285
286 nextProtoOnce sync.Once
287 h2transport h2Transport
288 tlsNextProtoWasNil bool
289
290
291
292
293
294
295 ForceAttemptHTTP2 bool
296
297
298
299
300
301 HTTP2 *HTTP2Config
302
303
304
305
306
307
308
309
310
311 Protocols *Protocols
312 }
313
314 func (t *Transport) writeBufferSize() int {
315 if t.WriteBufferSize > 0 {
316 return t.WriteBufferSize
317 }
318 return 4 << 10
319 }
320
321 func (t *Transport) readBufferSize() int {
322 if t.ReadBufferSize > 0 {
323 return t.ReadBufferSize
324 }
325 return 4 << 10
326 }
327
328
329 func (t *Transport) Clone() *Transport {
330 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
331 t2 := &Transport{
332 Proxy: t.Proxy,
333 OnProxyConnectResponse: t.OnProxyConnectResponse,
334 DialContext: t.DialContext,
335 Dial: t.Dial,
336 DialTLS: t.DialTLS,
337 DialTLSContext: t.DialTLSContext,
338 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
339 DisableKeepAlives: t.DisableKeepAlives,
340 DisableCompression: t.DisableCompression,
341 MaxIdleConns: t.MaxIdleConns,
342 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
343 MaxConnsPerHost: t.MaxConnsPerHost,
344 IdleConnTimeout: t.IdleConnTimeout,
345 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
346 ExpectContinueTimeout: t.ExpectContinueTimeout,
347 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
348 GetProxyConnectHeader: t.GetProxyConnectHeader,
349 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
350 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
351 WriteBufferSize: t.WriteBufferSize,
352 ReadBufferSize: t.ReadBufferSize,
353 }
354 if t.TLSClientConfig != nil {
355 t2.TLSClientConfig = t.TLSClientConfig.Clone()
356 }
357 if t.HTTP2 != nil {
358 t2.HTTP2 = &HTTP2Config{}
359 *t2.HTTP2 = *t.HTTP2
360 }
361 if t.Protocols != nil {
362 t2.Protocols = &Protocols{}
363 *t2.Protocols = *t.Protocols
364 }
365 if !t.tlsNextProtoWasNil {
366 npm := maps.Clone(t.TLSNextProto)
367 if npm == nil {
368 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
369 }
370 t2.TLSNextProto = npm
371 }
372 return t2
373 }
374
375
376
377
378
379
380
381 type h2Transport interface {
382 CloseIdleConnections()
383 }
384
385 func (t *Transport) hasCustomTLSDialer() bool {
386 return t.DialTLS != nil || t.DialTLSContext != nil
387 }
388
389 var http2client = godebug.New("http2client")
390
391
392
393 func (t *Transport) onceSetNextProtoDefaults() {
394 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
395 if http2client.Value() == "0" {
396 http2client.IncNonDefault()
397 return
398 }
399
400
401
402
403
404
405 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
406 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
407 if v := rv.Field(0); v.CanInterface() {
408 if h2i, ok := v.Interface().(h2Transport); ok {
409 t.h2transport = h2i
410 return
411 }
412 }
413 }
414
415 if _, ok := t.TLSNextProto["h2"]; ok {
416
417 return
418 }
419 protocols := t.protocols()
420 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
421 return
422 }
423 if omitBundledHTTP2 {
424 return
425 }
426 t2, err := http2configureTransports(t)
427 if err != nil {
428 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
429 return
430 }
431 t.h2transport = t2
432
433
434
435
436
437
438
439 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
440 const h2max = 1<<32 - 1
441 if limit1 >= h2max {
442 t2.MaxHeaderListSize = h2max
443 } else {
444 t2.MaxHeaderListSize = uint32(limit1)
445 }
446 }
447
448
449
450
451
452
453 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
454 }
455
456 func (t *Transport) protocols() Protocols {
457 if t.Protocols != nil {
458 return *t.Protocols
459 }
460 var p Protocols
461 p.SetHTTP1(true)
462 switch {
463 case t.TLSNextProto != nil:
464
465
466 if t.TLSNextProto["h2"] != nil {
467 p.SetHTTP2(true)
468 }
469 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
470
471
472
473
474
475
476 case http2client.Value() == "0":
477 default:
478 p.SetHTTP2(true)
479 }
480 return p
481 }
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
500 return envProxyFunc()(req.URL)
501 }
502
503
504
505 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
506 return func(*Request) (*url.URL, error) {
507 return fixedURL, nil
508 }
509 }
510
511
512
513
514 type transportRequest struct {
515 *Request
516 extra Header
517 trace *httptrace.ClientTrace
518
519 ctx context.Context
520 cancel context.CancelCauseFunc
521
522 mu sync.Mutex
523 err error
524 }
525
526 func (tr *transportRequest) extraHeaders() Header {
527 if tr.extra == nil {
528 tr.extra = make(Header)
529 }
530 return tr.extra
531 }
532
533 func (tr *transportRequest) setError(err error) {
534 tr.mu.Lock()
535 if tr.err == nil {
536 tr.err = err
537 }
538 tr.mu.Unlock()
539 }
540
541
542
543 func (t *Transport) useRegisteredProtocol(req *Request) bool {
544 if req.URL.Scheme == "https" && req.requiresHTTP1() {
545
546
547
548
549 return false
550 }
551 return true
552 }
553
554
555
556
557 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
558 if !t.useRegisteredProtocol(req) {
559 return nil
560 }
561 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
562 return altProto[req.URL.Scheme]
563 }
564
565 func validateHeaders(hdrs Header) string {
566 for k, vv := range hdrs {
567 if !httpguts.ValidHeaderFieldName(k) {
568 return fmt.Sprintf("field name %q", k)
569 }
570 for _, v := range vv {
571 if !httpguts.ValidHeaderFieldValue(v) {
572
573
574 return fmt.Sprintf("field value for %q", k)
575 }
576 }
577 }
578 return ""
579 }
580
581
582 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
583 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
584 ctx := req.Context()
585 trace := httptrace.ContextClientTrace(ctx)
586
587 if req.URL == nil {
588 req.closeBody()
589 return nil, errors.New("http: nil Request.URL")
590 }
591 if req.Header == nil {
592 req.closeBody()
593 return nil, errors.New("http: nil Request.Header")
594 }
595 scheme := req.URL.Scheme
596 isHTTP := scheme == "http" || scheme == "https"
597 if isHTTP {
598
599 if err := validateHeaders(req.Header); err != "" {
600 req.closeBody()
601 return nil, fmt.Errorf("net/http: invalid header %s", err)
602 }
603
604
605 if err := validateHeaders(req.Trailer); err != "" {
606 req.closeBody()
607 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
608 }
609 }
610
611 origReq := req
612 req = setupRewindBody(req)
613
614 if altRT := t.alternateRoundTripper(req); altRT != nil {
615 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
616 return resp, err
617 }
618 var err error
619 req, err = rewindBody(req)
620 if err != nil {
621 return nil, err
622 }
623 }
624 if !isHTTP {
625 req.closeBody()
626 return nil, badStringError("unsupported protocol scheme", scheme)
627 }
628 if req.Method != "" && !validMethod(req.Method) {
629 req.closeBody()
630 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
631 }
632 if req.URL.Host == "" {
633 req.closeBody()
634 return nil, errors.New("http: no Host in request URL")
635 }
636
637
638
639
640
641
642
643
644
645
646 ctx, cancel := context.WithCancelCause(req.Context())
647
648
649 if origReq.Cancel != nil {
650 go awaitLegacyCancel(ctx, cancel, origReq)
651 }
652
653
654
655
656
657 cancel = t.prepareTransportCancel(origReq, cancel)
658
659 defer func() {
660 if err != nil {
661 cancel(err)
662 }
663 }()
664
665 for {
666 select {
667 case <-ctx.Done():
668 req.closeBody()
669 return nil, context.Cause(ctx)
670 default:
671 }
672
673
674 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
675 cm, err := t.connectMethodForRequest(treq)
676 if err != nil {
677 req.closeBody()
678 return nil, err
679 }
680
681
682
683
684
685 pconn, err := t.getConn(treq, cm)
686 if err != nil {
687 req.closeBody()
688 return nil, err
689 }
690
691 var resp *Response
692 if pconn.alt != nil {
693
694 resp, err = pconn.alt.RoundTrip(req)
695 } else {
696 resp, err = pconn.roundTrip(treq)
697 }
698 if err == nil {
699 if pconn.alt != nil {
700
701
702
703
704
705 cancel(errRequestDone)
706 }
707 resp.Request = origReq
708 return resp, nil
709 }
710
711
712 if http2isNoCachedConnError(err) {
713 if t.removeIdleConn(pconn) {
714 t.decConnsPerHost(pconn.cacheKey)
715 }
716 } else if !pconn.shouldRetryRequest(req, err) {
717
718
719 if e, ok := err.(nothingWrittenError); ok {
720 err = e.error
721 }
722 if e, ok := err.(transportReadFromServerError); ok {
723 err = e.err
724 }
725 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
726
727
728
729 req.closeBody()
730 }
731 return nil, err
732 }
733 testHookRoundTripRetried()
734
735
736 req, err = rewindBody(req)
737 if err != nil {
738 return nil, err
739 }
740 }
741 }
742
743 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
744 select {
745 case <-req.Cancel:
746 cancel(errRequestCanceled)
747 case <-ctx.Done():
748 }
749 }
750
751 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
752
753 type readTrackingBody struct {
754 io.ReadCloser
755 didRead bool
756 didClose bool
757 }
758
759 func (r *readTrackingBody) Read(data []byte) (int, error) {
760 r.didRead = true
761 return r.ReadCloser.Read(data)
762 }
763
764 func (r *readTrackingBody) Close() error {
765 r.didClose = true
766 return r.ReadCloser.Close()
767 }
768
769
770
771
772
773 func setupRewindBody(req *Request) *Request {
774 if req.Body == nil || req.Body == NoBody {
775 return req
776 }
777 newReq := *req
778 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
779 return &newReq
780 }
781
782
783
784
785
786 func rewindBody(req *Request) (rewound *Request, err error) {
787 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
788 return req, nil
789 }
790 if !req.Body.(*readTrackingBody).didClose {
791 req.closeBody()
792 }
793 if req.GetBody == nil {
794 return nil, errCannotRewind
795 }
796 body, err := req.GetBody()
797 if err != nil {
798 return nil, err
799 }
800 newReq := *req
801 newReq.Body = &readTrackingBody{ReadCloser: body}
802 return &newReq, nil
803 }
804
805
806
807
808 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
809 if http2isNoCachedConnError(err) {
810
811
812
813
814
815
816 return true
817 }
818 if err == errMissingHost {
819
820 return false
821 }
822 if !pc.isReused() {
823
824
825
826
827
828
829
830 return false
831 }
832 if _, ok := err.(nothingWrittenError); ok {
833
834
835 return req.outgoingLength() == 0 || req.GetBody != nil
836 }
837 if !req.isReplayable() {
838
839 return false
840 }
841 if _, ok := err.(transportReadFromServerError); ok {
842
843
844 return true
845 }
846 if err == errServerClosedIdle {
847
848
849
850 return true
851 }
852 return false
853 }
854
855
856 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
857
858
859
860
861
862
863
864
865
866
867
868 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
869 t.altMu.Lock()
870 defer t.altMu.Unlock()
871 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
872 if _, exists := oldMap[scheme]; exists {
873 panic("protocol " + scheme + " already registered")
874 }
875 newMap := maps.Clone(oldMap)
876 if newMap == nil {
877 newMap = make(map[string]RoundTripper)
878 }
879 newMap[scheme] = rt
880 t.altProto.Store(newMap)
881 }
882
883
884
885
886
887 func (t *Transport) CloseIdleConnections() {
888 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
889 t.idleMu.Lock()
890 m := t.idleConn
891 t.idleConn = nil
892 t.closeIdle = true
893 t.idleLRU = connLRU{}
894 t.idleMu.Unlock()
895 for _, conns := range m {
896 for _, pconn := range conns {
897 pconn.close(errCloseIdleConns)
898 }
899 }
900 t.connsPerHostMu.Lock()
901 t.dialsInProgress.all(func(w *wantConn) {
902 if w.cancelCtx != nil && !w.waiting() {
903 w.cancelCtx()
904 }
905 })
906 t.connsPerHostMu.Unlock()
907 if t2 := t.h2transport; t2 != nil {
908 t2.CloseIdleConnections()
909 }
910 }
911
912
913 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
914
915
916
917
918
919
920 cancel := func(err error) {
921 origCancel(err)
922 t.reqMu.Lock()
923 delete(t.reqCanceler, req)
924 t.reqMu.Unlock()
925 }
926 t.reqMu.Lock()
927 if t.reqCanceler == nil {
928 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
929 }
930 t.reqCanceler[req] = cancel
931 t.reqMu.Unlock()
932 return cancel
933 }
934
935
936
937
938
939
940
941 func (t *Transport) CancelRequest(req *Request) {
942 t.reqMu.Lock()
943 cancel := t.reqCanceler[req]
944 t.reqMu.Unlock()
945 if cancel != nil {
946 cancel(errRequestCanceled)
947 }
948 }
949
950
951
952
953
954 var (
955 envProxyOnce sync.Once
956 envProxyFuncValue func(*url.URL) (*url.URL, error)
957 )
958
959
960
961 func envProxyFunc() func(*url.URL) (*url.URL, error) {
962 envProxyOnce.Do(func() {
963 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
964 })
965 return envProxyFuncValue
966 }
967
968
969 func resetProxyConfig() {
970 envProxyOnce = sync.Once{}
971 envProxyFuncValue = nil
972 }
973
974 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
975 cm.targetScheme = treq.URL.Scheme
976 cm.targetAddr = canonicalAddr(treq.URL)
977 if t.Proxy != nil {
978 cm.proxyURL, err = t.Proxy(treq.Request)
979 }
980 cm.onlyH1 = treq.requiresHTTP1()
981 return cm, err
982 }
983
984
985
986 func (cm *connectMethod) proxyAuth() string {
987 if cm.proxyURL == nil {
988 return ""
989 }
990 if u := cm.proxyURL.User; u != nil {
991 username := u.Username()
992 password, _ := u.Password()
993 return "Basic " + basicAuth(username, password)
994 }
995 return ""
996 }
997
998
999 var (
1000 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1001 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1002 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1003 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1004 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1005 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1006 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1007 errIdleConnTimeout = errors.New("http: idle connection timeout")
1008
1009
1010
1011
1012
1013 errServerClosedIdle = errors.New("http: server closed idle connection")
1014 )
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 type transportReadFromServerError struct {
1025 err error
1026 }
1027
1028 func (e transportReadFromServerError) Unwrap() error { return e.err }
1029
1030 func (e transportReadFromServerError) Error() string {
1031 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1032 }
1033
1034 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1035 if err := t.tryPutIdleConn(pconn); err != nil {
1036 pconn.close(err)
1037 }
1038 }
1039
1040 func (t *Transport) maxIdleConnsPerHost() int {
1041 if v := t.MaxIdleConnsPerHost; v != 0 {
1042 return v
1043 }
1044 return DefaultMaxIdleConnsPerHost
1045 }
1046
1047
1048
1049
1050
1051
1052 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1053 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1054 return errKeepAlivesDisabled
1055 }
1056 if pconn.isBroken() {
1057 return errConnBroken
1058 }
1059 pconn.markReused()
1060
1061 t.idleMu.Lock()
1062 defer t.idleMu.Unlock()
1063
1064
1065
1066
1067 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1068 return nil
1069 }
1070
1071
1072
1073
1074
1075 key := pconn.cacheKey
1076 if q, ok := t.idleConnWait[key]; ok {
1077 done := false
1078 if pconn.alt == nil {
1079
1080
1081 for q.len() > 0 {
1082 w := q.popFront()
1083 if w.tryDeliver(pconn, nil, time.Time{}) {
1084 done = true
1085 break
1086 }
1087 }
1088 } else {
1089
1090
1091
1092
1093 for q.len() > 0 {
1094 w := q.popFront()
1095 w.tryDeliver(pconn, nil, time.Time{})
1096 }
1097 }
1098 if q.len() == 0 {
1099 delete(t.idleConnWait, key)
1100 } else {
1101 t.idleConnWait[key] = q
1102 }
1103 if done {
1104 return nil
1105 }
1106 }
1107
1108 if t.closeIdle {
1109 return errCloseIdle
1110 }
1111 if t.idleConn == nil {
1112 t.idleConn = make(map[connectMethodKey][]*persistConn)
1113 }
1114 idles := t.idleConn[key]
1115 if len(idles) >= t.maxIdleConnsPerHost() {
1116 return errTooManyIdleHost
1117 }
1118 for _, exist := range idles {
1119 if exist == pconn {
1120 log.Fatalf("dup idle pconn %p in freelist", pconn)
1121 }
1122 }
1123 t.idleConn[key] = append(idles, pconn)
1124 t.idleLRU.add(pconn)
1125 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1126 oldest := t.idleLRU.removeOldest()
1127 oldest.close(errTooManyIdle)
1128 t.removeIdleConnLocked(oldest)
1129 }
1130
1131
1132
1133
1134 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1135 if pconn.idleTimer != nil {
1136 pconn.idleTimer.Reset(t.IdleConnTimeout)
1137 } else {
1138 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1139 }
1140 }
1141 pconn.idleAt = time.Now()
1142 return nil
1143 }
1144
1145
1146
1147
1148 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1149 if t.DisableKeepAlives {
1150 return false
1151 }
1152
1153 t.idleMu.Lock()
1154 defer t.idleMu.Unlock()
1155
1156
1157
1158 t.closeIdle = false
1159
1160 if w == nil {
1161
1162 return false
1163 }
1164
1165
1166
1167
1168 var oldTime time.Time
1169 if t.IdleConnTimeout > 0 {
1170 oldTime = time.Now().Add(-t.IdleConnTimeout)
1171 }
1172
1173
1174 if list, ok := t.idleConn[w.key]; ok {
1175 stop := false
1176 delivered := false
1177 for len(list) > 0 && !stop {
1178 pconn := list[len(list)-1]
1179
1180
1181
1182
1183 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1184 if tooOld {
1185
1186
1187
1188 go pconn.closeConnIfStillIdle()
1189 }
1190 if pconn.isBroken() || tooOld {
1191
1192
1193
1194
1195
1196 list = list[:len(list)-1]
1197 continue
1198 }
1199 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1200 if delivered {
1201 if pconn.alt != nil {
1202
1203
1204 } else {
1205
1206
1207 t.idleLRU.remove(pconn)
1208 list = list[:len(list)-1]
1209 }
1210 }
1211 stop = true
1212 }
1213 if len(list) > 0 {
1214 t.idleConn[w.key] = list
1215 } else {
1216 delete(t.idleConn, w.key)
1217 }
1218 if stop {
1219 return delivered
1220 }
1221 }
1222
1223
1224 if t.idleConnWait == nil {
1225 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1226 }
1227 q := t.idleConnWait[w.key]
1228 q.cleanFrontNotWaiting()
1229 q.pushBack(w)
1230 t.idleConnWait[w.key] = q
1231 return false
1232 }
1233
1234
1235 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1236 t.idleMu.Lock()
1237 defer t.idleMu.Unlock()
1238 return t.removeIdleConnLocked(pconn)
1239 }
1240
1241
1242 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1243 if pconn.idleTimer != nil {
1244 pconn.idleTimer.Stop()
1245 }
1246 t.idleLRU.remove(pconn)
1247 key := pconn.cacheKey
1248 pconns := t.idleConn[key]
1249 var removed bool
1250 switch len(pconns) {
1251 case 0:
1252
1253 case 1:
1254 if pconns[0] == pconn {
1255 delete(t.idleConn, key)
1256 removed = true
1257 }
1258 default:
1259 for i, v := range pconns {
1260 if v != pconn {
1261 continue
1262 }
1263
1264
1265 copy(pconns[i:], pconns[i+1:])
1266 t.idleConn[key] = pconns[:len(pconns)-1]
1267 removed = true
1268 break
1269 }
1270 }
1271 return removed
1272 }
1273
1274 var zeroDialer net.Dialer
1275
1276 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1277 if t.DialContext != nil {
1278 c, err := t.DialContext(ctx, network, addr)
1279 if c == nil && err == nil {
1280 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1281 }
1282 return c, err
1283 }
1284 if t.Dial != nil {
1285 c, err := t.Dial(network, addr)
1286 if c == nil && err == nil {
1287 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1288 }
1289 return c, err
1290 }
1291 return zeroDialer.DialContext(ctx, network, addr)
1292 }
1293
1294
1295
1296
1297
1298
1299
1300 type wantConn struct {
1301 cm connectMethod
1302 key connectMethodKey
1303
1304
1305
1306
1307 beforeDial func()
1308 afterDial func()
1309
1310 mu sync.Mutex
1311 ctx context.Context
1312 cancelCtx context.CancelFunc
1313 done bool
1314 result chan connOrError
1315 }
1316
1317 type connOrError struct {
1318 pc *persistConn
1319 err error
1320 idleAt time.Time
1321 }
1322
1323
1324 func (w *wantConn) waiting() bool {
1325 w.mu.Lock()
1326 defer w.mu.Unlock()
1327
1328 return !w.done
1329 }
1330
1331
1332 func (w *wantConn) getCtxForDial() context.Context {
1333 w.mu.Lock()
1334 defer w.mu.Unlock()
1335
1336 return w.ctx
1337 }
1338
1339
1340 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1341 w.mu.Lock()
1342 defer w.mu.Unlock()
1343
1344 if w.done {
1345 return false
1346 }
1347 if (pc == nil) == (err == nil) {
1348 panic("net/http: internal error: misuse of tryDeliver")
1349 }
1350 w.ctx = nil
1351 w.done = true
1352
1353 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1354 close(w.result)
1355
1356 return true
1357 }
1358
1359
1360
1361 func (w *wantConn) cancel(t *Transport, err error) {
1362 w.mu.Lock()
1363 var pc *persistConn
1364 if w.done {
1365 if r, ok := <-w.result; ok {
1366 pc = r.pc
1367 }
1368 } else {
1369 close(w.result)
1370 }
1371 w.ctx = nil
1372 w.done = true
1373 w.mu.Unlock()
1374
1375
1376
1377
1378 if pc != nil && pc.alt == nil {
1379 t.putOrCloseIdleConn(pc)
1380 }
1381 }
1382
1383
1384 type wantConnQueue struct {
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395 head []*wantConn
1396 headPos int
1397 tail []*wantConn
1398 }
1399
1400
1401 func (q *wantConnQueue) len() int {
1402 return len(q.head) - q.headPos + len(q.tail)
1403 }
1404
1405
1406 func (q *wantConnQueue) pushBack(w *wantConn) {
1407 q.tail = append(q.tail, w)
1408 }
1409
1410
1411 func (q *wantConnQueue) popFront() *wantConn {
1412 if q.headPos >= len(q.head) {
1413 if len(q.tail) == 0 {
1414 return nil
1415 }
1416
1417 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1418 }
1419 w := q.head[q.headPos]
1420 q.head[q.headPos] = nil
1421 q.headPos++
1422 return w
1423 }
1424
1425
1426 func (q *wantConnQueue) peekFront() *wantConn {
1427 if q.headPos < len(q.head) {
1428 return q.head[q.headPos]
1429 }
1430 if len(q.tail) > 0 {
1431 return q.tail[0]
1432 }
1433 return nil
1434 }
1435
1436
1437
1438 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1439 for {
1440 w := q.peekFront()
1441 if w == nil || w.waiting() {
1442 return cleaned
1443 }
1444 q.popFront()
1445 cleaned = true
1446 }
1447 }
1448
1449
1450 func (q *wantConnQueue) cleanFrontCanceled() {
1451 for {
1452 w := q.peekFront()
1453 if w == nil || w.cancelCtx != nil {
1454 return
1455 }
1456 q.popFront()
1457 }
1458 }
1459
1460
1461
1462 func (q *wantConnQueue) all(f func(*wantConn)) {
1463 for _, w := range q.head[q.headPos:] {
1464 f(w)
1465 }
1466 for _, w := range q.tail {
1467 f(w)
1468 }
1469 }
1470
1471 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1472 if t.DialTLSContext != nil {
1473 conn, err = t.DialTLSContext(ctx, network, addr)
1474 } else {
1475 conn, err = t.DialTLS(network, addr)
1476 }
1477 if conn == nil && err == nil {
1478 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1479 }
1480 return
1481 }
1482
1483
1484
1485
1486
1487 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1488 req := treq.Request
1489 trace := treq.trace
1490 ctx := req.Context()
1491 if trace != nil && trace.GetConn != nil {
1492 trace.GetConn(cm.addr())
1493 }
1494
1495
1496
1497
1498
1499
1500 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1501
1502 w := &wantConn{
1503 cm: cm,
1504 key: cm.key(),
1505 ctx: dialCtx,
1506 cancelCtx: dialCancel,
1507 result: make(chan connOrError, 1),
1508 beforeDial: testHookPrePendingDial,
1509 afterDial: testHookPostPendingDial,
1510 }
1511 defer func() {
1512 if err != nil {
1513 w.cancel(t, err)
1514 }
1515 }()
1516
1517
1518 if delivered := t.queueForIdleConn(w); !delivered {
1519 t.queueForDial(w)
1520 }
1521
1522
1523 select {
1524 case r := <-w.result:
1525
1526
1527 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1528 info := httptrace.GotConnInfo{
1529 Conn: r.pc.conn,
1530 Reused: r.pc.isReused(),
1531 }
1532 if !r.idleAt.IsZero() {
1533 info.WasIdle = true
1534 info.IdleTime = time.Since(r.idleAt)
1535 }
1536 trace.GotConn(info)
1537 }
1538 if r.err != nil {
1539
1540
1541
1542 select {
1543 case <-treq.ctx.Done():
1544 err := context.Cause(treq.ctx)
1545 if err == errRequestCanceled {
1546 err = errRequestCanceledConn
1547 }
1548 return nil, err
1549 default:
1550
1551 }
1552 }
1553 return r.pc, r.err
1554 case <-treq.ctx.Done():
1555 err := context.Cause(treq.ctx)
1556 if err == errRequestCanceled {
1557 err = errRequestCanceledConn
1558 }
1559 return nil, err
1560 }
1561 }
1562
1563
1564
1565 func (t *Transport) queueForDial(w *wantConn) {
1566 w.beforeDial()
1567
1568 t.connsPerHostMu.Lock()
1569 defer t.connsPerHostMu.Unlock()
1570
1571 if t.MaxConnsPerHost <= 0 {
1572 t.startDialConnForLocked(w)
1573 return
1574 }
1575
1576 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1577 if t.connsPerHost == nil {
1578 t.connsPerHost = make(map[connectMethodKey]int)
1579 }
1580 t.connsPerHost[w.key] = n + 1
1581 t.startDialConnForLocked(w)
1582 return
1583 }
1584
1585 if t.connsPerHostWait == nil {
1586 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1587 }
1588 q := t.connsPerHostWait[w.key]
1589 q.cleanFrontNotWaiting()
1590 q.pushBack(w)
1591 t.connsPerHostWait[w.key] = q
1592 }
1593
1594
1595
1596 func (t *Transport) startDialConnForLocked(w *wantConn) {
1597 t.dialsInProgress.cleanFrontCanceled()
1598 t.dialsInProgress.pushBack(w)
1599 go func() {
1600 t.dialConnFor(w)
1601 t.connsPerHostMu.Lock()
1602 defer t.connsPerHostMu.Unlock()
1603 w.cancelCtx = nil
1604 }()
1605 }
1606
1607
1608
1609
1610 func (t *Transport) dialConnFor(w *wantConn) {
1611 defer w.afterDial()
1612 ctx := w.getCtxForDial()
1613 if ctx == nil {
1614 t.decConnsPerHost(w.key)
1615 return
1616 }
1617
1618 pc, err := t.dialConn(ctx, w.cm)
1619 delivered := w.tryDeliver(pc, err, time.Time{})
1620 if err == nil && (!delivered || pc.alt != nil) {
1621
1622
1623
1624 t.putOrCloseIdleConn(pc)
1625 }
1626 if err != nil {
1627 t.decConnsPerHost(w.key)
1628 }
1629 }
1630
1631
1632
1633 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1634 if t.MaxConnsPerHost <= 0 {
1635 return
1636 }
1637
1638 t.connsPerHostMu.Lock()
1639 defer t.connsPerHostMu.Unlock()
1640 n := t.connsPerHost[key]
1641 if n == 0 {
1642
1643
1644 panic("net/http: internal error: connCount underflow")
1645 }
1646
1647
1648
1649
1650
1651 if q := t.connsPerHostWait[key]; q.len() > 0 {
1652 done := false
1653 for q.len() > 0 {
1654 w := q.popFront()
1655 if w.waiting() {
1656 t.startDialConnForLocked(w)
1657 done = true
1658 break
1659 }
1660 }
1661 if q.len() == 0 {
1662 delete(t.connsPerHostWait, key)
1663 } else {
1664
1665
1666 t.connsPerHostWait[key] = q
1667 }
1668 if done {
1669 return
1670 }
1671 }
1672
1673
1674 if n--; n == 0 {
1675 delete(t.connsPerHost, key)
1676 } else {
1677 t.connsPerHost[key] = n
1678 }
1679 }
1680
1681
1682
1683
1684 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1685
1686 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1687 if cfg.ServerName == "" {
1688 cfg.ServerName = name
1689 }
1690 if pconn.cacheKey.onlyH1 {
1691 cfg.NextProtos = nil
1692 }
1693 plainConn := pconn.conn
1694 tlsConn := tls.Client(plainConn, cfg)
1695 errc := make(chan error, 2)
1696 var timer *time.Timer
1697 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1698 timer = time.AfterFunc(d, func() {
1699 errc <- tlsHandshakeTimeoutError{}
1700 })
1701 }
1702 go func() {
1703 if trace != nil && trace.TLSHandshakeStart != nil {
1704 trace.TLSHandshakeStart()
1705 }
1706 err := tlsConn.HandshakeContext(ctx)
1707 if timer != nil {
1708 timer.Stop()
1709 }
1710 errc <- err
1711 }()
1712 if err := <-errc; err != nil {
1713 plainConn.Close()
1714 if err == (tlsHandshakeTimeoutError{}) {
1715
1716
1717 <-errc
1718 }
1719 if trace != nil && trace.TLSHandshakeDone != nil {
1720 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1721 }
1722 return err
1723 }
1724 cs := tlsConn.ConnectionState()
1725 if trace != nil && trace.TLSHandshakeDone != nil {
1726 trace.TLSHandshakeDone(cs, nil)
1727 }
1728 pconn.tlsState = &cs
1729 pconn.conn = tlsConn
1730 return nil
1731 }
1732
1733 type erringRoundTripper interface {
1734 RoundTripErr() error
1735 }
1736
1737 var testHookProxyConnectTimeout = context.WithTimeout
1738
1739 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1740 pconn = &persistConn{
1741 t: t,
1742 cacheKey: cm.key(),
1743 reqch: make(chan requestAndChan, 1),
1744 writech: make(chan writeRequest, 1),
1745 closech: make(chan struct{}),
1746 writeErrCh: make(chan error, 1),
1747 writeLoopDone: make(chan struct{}),
1748 }
1749 trace := httptrace.ContextClientTrace(ctx)
1750 wrapErr := func(err error) error {
1751 if cm.proxyURL != nil {
1752
1753 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1754 }
1755 return err
1756 }
1757 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1758 var err error
1759 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1760 if err != nil {
1761 return nil, wrapErr(err)
1762 }
1763 if tc, ok := pconn.conn.(*tls.Conn); ok {
1764
1765
1766 if trace != nil && trace.TLSHandshakeStart != nil {
1767 trace.TLSHandshakeStart()
1768 }
1769 if err := tc.HandshakeContext(ctx); err != nil {
1770 go pconn.conn.Close()
1771 if trace != nil && trace.TLSHandshakeDone != nil {
1772 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1773 }
1774 return nil, err
1775 }
1776 cs := tc.ConnectionState()
1777 if trace != nil && trace.TLSHandshakeDone != nil {
1778 trace.TLSHandshakeDone(cs, nil)
1779 }
1780 pconn.tlsState = &cs
1781 }
1782 } else {
1783 conn, err := t.dial(ctx, "tcp", cm.addr())
1784 if err != nil {
1785 return nil, wrapErr(err)
1786 }
1787 pconn.conn = conn
1788 if cm.scheme() == "https" {
1789 var firstTLSHost string
1790 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1791 return nil, wrapErr(err)
1792 }
1793 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1794 return nil, wrapErr(err)
1795 }
1796 }
1797 }
1798
1799
1800 switch {
1801 case cm.proxyURL == nil:
1802
1803 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1804 conn := pconn.conn
1805 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1806 if u := cm.proxyURL.User; u != nil {
1807 auth := &socksUsernamePassword{
1808 Username: u.Username(),
1809 }
1810 auth.Password, _ = u.Password()
1811 d.AuthMethods = []socksAuthMethod{
1812 socksAuthMethodNotRequired,
1813 socksAuthMethodUsernamePassword,
1814 }
1815 d.Authenticate = auth.Authenticate
1816 }
1817 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1818 conn.Close()
1819 return nil, err
1820 }
1821 case cm.targetScheme == "http":
1822 pconn.isProxy = true
1823 if pa := cm.proxyAuth(); pa != "" {
1824 pconn.mutateHeaderFunc = func(h Header) {
1825 h.Set("Proxy-Authorization", pa)
1826 }
1827 }
1828 case cm.targetScheme == "https":
1829 conn := pconn.conn
1830 var hdr Header
1831 if t.GetProxyConnectHeader != nil {
1832 var err error
1833 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1834 if err != nil {
1835 conn.Close()
1836 return nil, err
1837 }
1838 } else {
1839 hdr = t.ProxyConnectHeader
1840 }
1841 if hdr == nil {
1842 hdr = make(Header)
1843 }
1844 if pa := cm.proxyAuth(); pa != "" {
1845 hdr = hdr.Clone()
1846 hdr.Set("Proxy-Authorization", pa)
1847 }
1848 connectReq := &Request{
1849 Method: "CONNECT",
1850 URL: &url.URL{Opaque: cm.targetAddr},
1851 Host: cm.targetAddr,
1852 Header: hdr,
1853 }
1854
1855
1856
1857
1858 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1859 defer cancel()
1860
1861 didReadResponse := make(chan struct{})
1862 var (
1863 resp *Response
1864 err error
1865 )
1866
1867 go func() {
1868 defer close(didReadResponse)
1869 err = connectReq.Write(conn)
1870 if err != nil {
1871 return
1872 }
1873
1874
1875 br := bufio.NewReader(conn)
1876 resp, err = ReadResponse(br, connectReq)
1877 }()
1878 select {
1879 case <-connectCtx.Done():
1880 conn.Close()
1881 <-didReadResponse
1882 return nil, connectCtx.Err()
1883 case <-didReadResponse:
1884
1885 }
1886 if err != nil {
1887 conn.Close()
1888 return nil, err
1889 }
1890
1891 if t.OnProxyConnectResponse != nil {
1892 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1893 if err != nil {
1894 conn.Close()
1895 return nil, err
1896 }
1897 }
1898
1899 if resp.StatusCode != 200 {
1900 _, text, ok := strings.Cut(resp.Status, " ")
1901 conn.Close()
1902 if !ok {
1903 return nil, errors.New("unknown status code")
1904 }
1905 return nil, errors.New(text)
1906 }
1907 }
1908
1909 if cm.proxyURL != nil && cm.targetScheme == "https" {
1910 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1911 return nil, err
1912 }
1913 }
1914
1915
1916 unencryptedHTTP2 := pconn.tlsState == nil &&
1917 t.Protocols != nil &&
1918 t.Protocols.UnencryptedHTTP2() &&
1919 !t.Protocols.HTTP1()
1920 if unencryptedHTTP2 {
1921 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1922 if !ok {
1923 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1924 }
1925 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1926 if e, ok := alt.(erringRoundTripper); ok {
1927
1928 return nil, e.RoundTripErr()
1929 }
1930 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1931 }
1932
1933 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1934 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1935 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1936 if e, ok := alt.(erringRoundTripper); ok {
1937
1938 return nil, e.RoundTripErr()
1939 }
1940 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1941 }
1942 }
1943
1944 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1945 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1946
1947 go pconn.readLoop()
1948 go pconn.writeLoop()
1949 return pconn, nil
1950 }
1951
1952
1953
1954
1955
1956
1957
1958 type persistConnWriter struct {
1959 pc *persistConn
1960 }
1961
1962 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1963 n, err = w.pc.conn.Write(p)
1964 w.pc.nwrite += int64(n)
1965 return
1966 }
1967
1968
1969
1970
1971 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1972 n, err = io.Copy(w.pc.conn, r)
1973 w.pc.nwrite += n
1974 return
1975 }
1976
1977 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995 type connectMethod struct {
1996 _ incomparable
1997 proxyURL *url.URL
1998 targetScheme string
1999
2000
2001
2002 targetAddr string
2003 onlyH1 bool
2004 }
2005
2006 func (cm *connectMethod) key() connectMethodKey {
2007 proxyStr := ""
2008 targetAddr := cm.targetAddr
2009 if cm.proxyURL != nil {
2010 proxyStr = cm.proxyURL.String()
2011 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2012 targetAddr = ""
2013 }
2014 }
2015 return connectMethodKey{
2016 proxy: proxyStr,
2017 scheme: cm.targetScheme,
2018 addr: targetAddr,
2019 onlyH1: cm.onlyH1,
2020 }
2021 }
2022
2023
2024 func (cm *connectMethod) scheme() string {
2025 if cm.proxyURL != nil {
2026 return cm.proxyURL.Scheme
2027 }
2028 return cm.targetScheme
2029 }
2030
2031
2032 func (cm *connectMethod) addr() string {
2033 if cm.proxyURL != nil {
2034 return canonicalAddr(cm.proxyURL)
2035 }
2036 return cm.targetAddr
2037 }
2038
2039
2040
2041 func (cm *connectMethod) tlsHost() string {
2042 h := cm.targetAddr
2043 if hasPort(h) {
2044 h = h[:strings.LastIndex(h, ":")]
2045 }
2046 return h
2047 }
2048
2049
2050
2051
2052 type connectMethodKey struct {
2053 proxy, scheme, addr string
2054 onlyH1 bool
2055 }
2056
2057 func (k connectMethodKey) String() string {
2058
2059 var h1 string
2060 if k.onlyH1 {
2061 h1 = ",h1"
2062 }
2063 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2064 }
2065
2066
2067
2068 type persistConn struct {
2069
2070
2071
2072 alt RoundTripper
2073
2074 t *Transport
2075 cacheKey connectMethodKey
2076 conn net.Conn
2077 tlsState *tls.ConnectionState
2078 br *bufio.Reader
2079 bw *bufio.Writer
2080 nwrite int64
2081 reqch chan requestAndChan
2082 writech chan writeRequest
2083 closech chan struct{}
2084 isProxy bool
2085 sawEOF bool
2086 readLimit int64
2087
2088
2089
2090
2091 writeErrCh chan error
2092
2093 writeLoopDone chan struct{}
2094
2095
2096 idleAt time.Time
2097 idleTimer *time.Timer
2098
2099 mu sync.Mutex
2100 numExpectedResponses int
2101 closed error
2102 canceledErr error
2103 broken bool
2104 reused bool
2105
2106
2107
2108 mutateHeaderFunc func(Header)
2109 }
2110
2111 func (pc *persistConn) maxHeaderResponseSize() int64 {
2112 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2113 return v
2114 }
2115 return 10 << 20
2116 }
2117
2118 func (pc *persistConn) Read(p []byte) (n int, err error) {
2119 if pc.readLimit <= 0 {
2120 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2121 }
2122 if int64(len(p)) > pc.readLimit {
2123 p = p[:pc.readLimit]
2124 }
2125 n, err = pc.conn.Read(p)
2126 if err == io.EOF {
2127 pc.sawEOF = true
2128 }
2129 pc.readLimit -= int64(n)
2130 return
2131 }
2132
2133
2134 func (pc *persistConn) isBroken() bool {
2135 pc.mu.Lock()
2136 b := pc.closed != nil
2137 pc.mu.Unlock()
2138 return b
2139 }
2140
2141
2142
2143 func (pc *persistConn) canceled() error {
2144 pc.mu.Lock()
2145 defer pc.mu.Unlock()
2146 return pc.canceledErr
2147 }
2148
2149
2150 func (pc *persistConn) isReused() bool {
2151 pc.mu.Lock()
2152 r := pc.reused
2153 pc.mu.Unlock()
2154 return r
2155 }
2156
2157 func (pc *persistConn) cancelRequest(err error) {
2158 pc.mu.Lock()
2159 defer pc.mu.Unlock()
2160 pc.canceledErr = err
2161 pc.closeLocked(errRequestCanceled)
2162 }
2163
2164
2165
2166
2167 func (pc *persistConn) closeConnIfStillIdle() {
2168 t := pc.t
2169 t.idleMu.Lock()
2170 defer t.idleMu.Unlock()
2171 if _, ok := t.idleLRU.m[pc]; !ok {
2172
2173 return
2174 }
2175 t.removeIdleConnLocked(pc)
2176 pc.close(errIdleConnTimeout)
2177 }
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2188 if err == nil {
2189 return nil
2190 }
2191
2192
2193
2194
2195
2196
2197
2198
2199 <-pc.writeLoopDone
2200
2201
2202
2203
2204 if cerr := pc.canceled(); cerr != nil {
2205 return cerr
2206 }
2207
2208
2209 req.mu.Lock()
2210 reqErr := req.err
2211 req.mu.Unlock()
2212 if reqErr != nil {
2213 return reqErr
2214 }
2215
2216 if err == errServerClosedIdle {
2217
2218 return err
2219 }
2220
2221 if _, ok := err.(transportReadFromServerError); ok {
2222 if pc.nwrite == startBytesWritten {
2223 return nothingWrittenError{err}
2224 }
2225
2226 return err
2227 }
2228 if pc.isBroken() {
2229 if pc.nwrite == startBytesWritten {
2230 return nothingWrittenError{err}
2231 }
2232 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2233 }
2234 return err
2235 }
2236
2237
2238
2239
2240 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2241
2242 func (pc *persistConn) readLoop() {
2243 closeErr := errReadLoopExiting
2244 defer func() {
2245 pc.close(closeErr)
2246 pc.t.removeIdleConn(pc)
2247 }()
2248
2249 tryPutIdleConn := func(treq *transportRequest) bool {
2250 trace := treq.trace
2251 if err := pc.t.tryPutIdleConn(pc); err != nil {
2252 closeErr = err
2253 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2254 trace.PutIdleConn(err)
2255 }
2256 return false
2257 }
2258 if trace != nil && trace.PutIdleConn != nil {
2259 trace.PutIdleConn(nil)
2260 }
2261 return true
2262 }
2263
2264
2265
2266
2267 eofc := make(chan struct{})
2268 defer close(eofc)
2269
2270
2271 testHookMu.Lock()
2272 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2273 testHookMu.Unlock()
2274
2275 alive := true
2276 for alive {
2277 pc.readLimit = pc.maxHeaderResponseSize()
2278 _, err := pc.br.Peek(1)
2279
2280 pc.mu.Lock()
2281 if pc.numExpectedResponses == 0 {
2282 pc.readLoopPeekFailLocked(err)
2283 pc.mu.Unlock()
2284 return
2285 }
2286 pc.mu.Unlock()
2287
2288 rc := <-pc.reqch
2289 trace := rc.treq.trace
2290
2291 var resp *Response
2292 if err == nil {
2293 resp, err = pc.readResponse(rc, trace)
2294 } else {
2295 err = transportReadFromServerError{err}
2296 closeErr = err
2297 }
2298
2299 if err != nil {
2300 if pc.readLimit <= 0 {
2301 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2302 }
2303
2304 select {
2305 case rc.ch <- responseAndError{err: err}:
2306 case <-rc.callerGone:
2307 return
2308 }
2309 return
2310 }
2311 pc.readLimit = maxInt64
2312
2313 pc.mu.Lock()
2314 pc.numExpectedResponses--
2315 pc.mu.Unlock()
2316
2317 bodyWritable := resp.bodyIsWritable()
2318 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2319
2320 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2321
2322
2323
2324 alive = false
2325 }
2326
2327 if !hasBody || bodyWritable {
2328
2329
2330
2331
2332
2333 alive = alive &&
2334 !pc.sawEOF &&
2335 pc.wroteRequest() &&
2336 tryPutIdleConn(rc.treq)
2337
2338 if bodyWritable {
2339 closeErr = errCallerOwnsConn
2340 }
2341
2342 select {
2343 case rc.ch <- responseAndError{res: resp}:
2344 case <-rc.callerGone:
2345 return
2346 }
2347
2348 rc.treq.cancel(errRequestDone)
2349
2350
2351
2352
2353 testHookReadLoopBeforeNextRead()
2354 continue
2355 }
2356
2357 waitForBodyRead := make(chan bool, 2)
2358 body := &bodyEOFSignal{
2359 body: resp.Body,
2360 earlyCloseFn: func() error {
2361 waitForBodyRead <- false
2362 <-eofc
2363 return nil
2364
2365 },
2366 fn: func(err error) error {
2367 isEOF := err == io.EOF
2368 waitForBodyRead <- isEOF
2369 if isEOF {
2370 <-eofc
2371 } else if err != nil {
2372 if cerr := pc.canceled(); cerr != nil {
2373 return cerr
2374 }
2375 }
2376 return err
2377 },
2378 }
2379
2380 resp.Body = body
2381 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2382 resp.Body = &gzipReader{body: body}
2383 resp.Header.Del("Content-Encoding")
2384 resp.Header.Del("Content-Length")
2385 resp.ContentLength = -1
2386 resp.Uncompressed = true
2387 }
2388
2389 select {
2390 case rc.ch <- responseAndError{res: resp}:
2391 case <-rc.callerGone:
2392 return
2393 }
2394
2395
2396
2397
2398 select {
2399 case bodyEOF := <-waitForBodyRead:
2400 alive = alive &&
2401 bodyEOF &&
2402 !pc.sawEOF &&
2403 pc.wroteRequest() &&
2404 tryPutIdleConn(rc.treq)
2405 if bodyEOF {
2406 eofc <- struct{}{}
2407 }
2408 case <-rc.treq.ctx.Done():
2409 alive = false
2410 pc.cancelRequest(context.Cause(rc.treq.ctx))
2411 case <-pc.closech:
2412 alive = false
2413 }
2414
2415 rc.treq.cancel(errRequestDone)
2416 testHookReadLoopBeforeNextRead()
2417 }
2418 }
2419
2420 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2421 if pc.closed != nil {
2422 return
2423 }
2424 if n := pc.br.Buffered(); n > 0 {
2425 buf, _ := pc.br.Peek(n)
2426 if is408Message(buf) {
2427 pc.closeLocked(errServerClosedIdle)
2428 return
2429 } else {
2430 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2431 }
2432 }
2433 if peekErr == io.EOF {
2434
2435 pc.closeLocked(errServerClosedIdle)
2436 } else {
2437 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2438 }
2439 }
2440
2441
2442
2443
2444 func is408Message(buf []byte) bool {
2445 if len(buf) < len("HTTP/1.x 408") {
2446 return false
2447 }
2448 if string(buf[:7]) != "HTTP/1." {
2449 return false
2450 }
2451 return string(buf[8:12]) == " 408"
2452 }
2453
2454
2455
2456
2457 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2458 if trace != nil && trace.GotFirstResponseByte != nil {
2459 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2460 trace.GotFirstResponseByte()
2461 }
2462 }
2463
2464 continueCh := rc.continueCh
2465 for {
2466 resp, err = ReadResponse(pc.br, rc.treq.Request)
2467 if err != nil {
2468 return
2469 }
2470 resCode := resp.StatusCode
2471 if continueCh != nil && resCode == StatusContinue {
2472 if trace != nil && trace.Got100Continue != nil {
2473 trace.Got100Continue()
2474 }
2475 continueCh <- struct{}{}
2476 continueCh = nil
2477 }
2478 is1xx := 100 <= resCode && resCode <= 199
2479
2480 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2481 if is1xxNonTerminal {
2482 if trace != nil && trace.Got1xxResponse != nil {
2483 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2484 return nil, err
2485 }
2486
2487
2488
2489
2490
2491
2492
2493 pc.readLimit = pc.maxHeaderResponseSize()
2494 }
2495 continue
2496 }
2497 break
2498 }
2499 if resp.isProtocolSwitch() {
2500 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2501 }
2502 if continueCh != nil {
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515 if resp.Close || rc.treq.Request.Close {
2516 close(continueCh)
2517 } else {
2518 continueCh <- struct{}{}
2519 }
2520 }
2521
2522 resp.TLS = pc.tlsState
2523 return
2524 }
2525
2526
2527
2528
2529 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2530 if continueCh == nil {
2531 return nil
2532 }
2533 return func() bool {
2534 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2535 defer timer.Stop()
2536
2537 select {
2538 case _, ok := <-continueCh:
2539 return ok
2540 case <-timer.C:
2541 return true
2542 case <-pc.closech:
2543 return false
2544 }
2545 }
2546 }
2547
2548 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2549 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2550 if br.Buffered() != 0 {
2551 body.br = br
2552 }
2553 return body
2554 }
2555
2556
2557
2558
2559
2560
2561 type readWriteCloserBody struct {
2562 _ incomparable
2563 br *bufio.Reader
2564 io.ReadWriteCloser
2565 }
2566
2567 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2568 if b.br != nil {
2569 if n := b.br.Buffered(); len(p) > n {
2570 p = p[:n]
2571 }
2572 n, err = b.br.Read(p)
2573 if b.br.Buffered() == 0 {
2574 b.br = nil
2575 }
2576 return n, err
2577 }
2578 return b.ReadWriteCloser.Read(p)
2579 }
2580
2581
2582 type nothingWrittenError struct {
2583 error
2584 }
2585
2586 func (nwe nothingWrittenError) Unwrap() error {
2587 return nwe.error
2588 }
2589
2590 func (pc *persistConn) writeLoop() {
2591 defer close(pc.writeLoopDone)
2592 for {
2593 select {
2594 case wr := <-pc.writech:
2595 startBytesWritten := pc.nwrite
2596 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2597 if bre, ok := err.(requestBodyReadError); ok {
2598 err = bre.error
2599
2600
2601
2602
2603
2604
2605
2606 wr.req.setError(err)
2607 }
2608 if err == nil {
2609 err = pc.bw.Flush()
2610 }
2611 if err != nil {
2612 if pc.nwrite == startBytesWritten {
2613 err = nothingWrittenError{err}
2614 }
2615 }
2616 pc.writeErrCh <- err
2617 wr.ch <- err
2618 if err != nil {
2619 pc.close(err)
2620 return
2621 }
2622 case <-pc.closech:
2623 return
2624 }
2625 }
2626 }
2627
2628
2629
2630
2631
2632
2633
2634 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2635
2636
2637
2638 func (pc *persistConn) wroteRequest() bool {
2639 select {
2640 case err := <-pc.writeErrCh:
2641
2642
2643 return err == nil
2644 default:
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2656 defer t.Stop()
2657 select {
2658 case err := <-pc.writeErrCh:
2659 return err == nil
2660 case <-t.C:
2661 return false
2662 }
2663 }
2664 }
2665
2666
2667
2668 type responseAndError struct {
2669 _ incomparable
2670 res *Response
2671 err error
2672 }
2673
2674 type requestAndChan struct {
2675 _ incomparable
2676 treq *transportRequest
2677 ch chan responseAndError
2678
2679
2680
2681
2682 addedGzip bool
2683
2684
2685
2686
2687
2688 continueCh chan<- struct{}
2689
2690 callerGone <-chan struct{}
2691 }
2692
2693
2694
2695
2696
2697 type writeRequest struct {
2698 req *transportRequest
2699 ch chan<- error
2700
2701
2702
2703
2704 continueCh <-chan struct{}
2705 }
2706
2707
2708
2709 type timeoutError struct {
2710 err string
2711 }
2712
2713 func (e *timeoutError) Error() string { return e.err }
2714 func (e *timeoutError) Timeout() bool { return true }
2715 func (e *timeoutError) Temporary() bool { return true }
2716 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2717
2718 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2719
2720
2721
2722 var errRequestCanceled = http2errRequestCanceled
2723 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2724
2725
2726
2727 var errRequestDone = errors.New("net/http: request completed")
2728
2729 func nop() {}
2730
2731
2732 var (
2733 testHookEnterRoundTrip = nop
2734 testHookWaitResLoop = nop
2735 testHookRoundTripRetried = nop
2736 testHookPrePendingDial = nop
2737 testHookPostPendingDial = nop
2738
2739 testHookMu sync.Locker = fakeLocker{}
2740 testHookReadLoopBeforeNextRead = nop
2741 )
2742
2743 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2744 testHookEnterRoundTrip()
2745 pc.mu.Lock()
2746 pc.numExpectedResponses++
2747 headerFn := pc.mutateHeaderFunc
2748 pc.mu.Unlock()
2749
2750 if headerFn != nil {
2751 headerFn(req.extraHeaders())
2752 }
2753
2754
2755
2756
2757
2758 requestedGzip := false
2759 if !pc.t.DisableCompression &&
2760 req.Header.Get("Accept-Encoding") == "" &&
2761 req.Header.Get("Range") == "" &&
2762 req.Method != "HEAD" {
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775 requestedGzip = true
2776 req.extraHeaders().Set("Accept-Encoding", "gzip")
2777 }
2778
2779 var continueCh chan struct{}
2780 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2781 continueCh = make(chan struct{}, 1)
2782 }
2783
2784 if pc.t.DisableKeepAlives &&
2785 !req.wantsClose() &&
2786 !isProtocolSwitchHeader(req.Header) {
2787 req.extraHeaders().Set("Connection", "close")
2788 }
2789
2790 gone := make(chan struct{})
2791 defer close(gone)
2792
2793 const debugRoundTrip = false
2794
2795
2796
2797
2798 startBytesWritten := pc.nwrite
2799 writeErrCh := make(chan error, 1)
2800 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2801
2802 resc := make(chan responseAndError)
2803 pc.reqch <- requestAndChan{
2804 treq: req,
2805 ch: resc,
2806 addedGzip: requestedGzip,
2807 continueCh: continueCh,
2808 callerGone: gone,
2809 }
2810
2811 handleResponse := func(re responseAndError) (*Response, error) {
2812 if (re.res == nil) == (re.err == nil) {
2813 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2814 }
2815 if debugRoundTrip {
2816 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2817 }
2818 if re.err != nil {
2819 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2820 }
2821 return re.res, nil
2822 }
2823
2824 var respHeaderTimer <-chan time.Time
2825 ctxDoneChan := req.ctx.Done()
2826 pcClosed := pc.closech
2827 for {
2828 testHookWaitResLoop()
2829 select {
2830 case err := <-writeErrCh:
2831 if debugRoundTrip {
2832 req.logf("writeErrCh recv: %T/%#v", err, err)
2833 }
2834 if err != nil {
2835 pc.close(fmt.Errorf("write error: %w", err))
2836 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2837 }
2838 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2839 if debugRoundTrip {
2840 req.logf("starting timer for %v", d)
2841 }
2842 timer := time.NewTimer(d)
2843 defer timer.Stop()
2844 respHeaderTimer = timer.C
2845 }
2846 case <-pcClosed:
2847 select {
2848 case re := <-resc:
2849
2850
2851
2852 return handleResponse(re)
2853 default:
2854 }
2855 if debugRoundTrip {
2856 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2857 }
2858 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2859 case <-respHeaderTimer:
2860 if debugRoundTrip {
2861 req.logf("timeout waiting for response headers.")
2862 }
2863 pc.close(errTimeout)
2864 return nil, errTimeout
2865 case re := <-resc:
2866 return handleResponse(re)
2867 case <-ctxDoneChan:
2868 select {
2869 case re := <-resc:
2870
2871
2872
2873 return handleResponse(re)
2874 default:
2875 }
2876 pc.cancelRequest(context.Cause(req.ctx))
2877 }
2878 }
2879 }
2880
2881
2882
2883 type tLogKey struct{}
2884
2885 func (tr *transportRequest) logf(format string, args ...any) {
2886 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2887 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2888 }
2889 }
2890
2891
2892
2893 func (pc *persistConn) markReused() {
2894 pc.mu.Lock()
2895 pc.reused = true
2896 pc.mu.Unlock()
2897 }
2898
2899
2900
2901
2902
2903
2904 func (pc *persistConn) close(err error) {
2905 pc.mu.Lock()
2906 defer pc.mu.Unlock()
2907 pc.closeLocked(err)
2908 }
2909
2910 func (pc *persistConn) closeLocked(err error) {
2911 if err == nil {
2912 panic("nil error")
2913 }
2914 pc.broken = true
2915 if pc.closed == nil {
2916 pc.closed = err
2917 pc.t.decConnsPerHost(pc.cacheKey)
2918
2919
2920 if pc.alt == nil {
2921 if err != errCallerOwnsConn {
2922 pc.conn.Close()
2923 }
2924 close(pc.closech)
2925 }
2926 }
2927 pc.mutateHeaderFunc = nil
2928 }
2929
2930 var portMap = map[string]string{
2931 "http": "80",
2932 "https": "443",
2933 "socks5": "1080",
2934 "socks5h": "1080",
2935 }
2936
2937 func idnaASCIIFromURL(url *url.URL) string {
2938 addr := url.Hostname()
2939 if v, err := idnaASCII(addr); err == nil {
2940 addr = v
2941 }
2942 return addr
2943 }
2944
2945
2946 func canonicalAddr(url *url.URL) string {
2947 port := url.Port()
2948 if port == "" {
2949 port = portMap[url.Scheme]
2950 }
2951 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2952 }
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965 type bodyEOFSignal struct {
2966 body io.ReadCloser
2967 mu sync.Mutex
2968 closed bool
2969 rerr error
2970 fn func(error) error
2971 earlyCloseFn func() error
2972 }
2973
2974 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2975
2976 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2977 es.mu.Lock()
2978 closed, rerr := es.closed, es.rerr
2979 es.mu.Unlock()
2980 if closed {
2981 return 0, errReadOnClosedResBody
2982 }
2983 if rerr != nil {
2984 return 0, rerr
2985 }
2986
2987 n, err = es.body.Read(p)
2988 if err != nil {
2989 es.mu.Lock()
2990 defer es.mu.Unlock()
2991 if es.rerr == nil {
2992 es.rerr = err
2993 }
2994 err = es.condfn(err)
2995 }
2996 return
2997 }
2998
2999 func (es *bodyEOFSignal) Close() error {
3000 es.mu.Lock()
3001 defer es.mu.Unlock()
3002 if es.closed {
3003 return nil
3004 }
3005 es.closed = true
3006 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3007 return es.earlyCloseFn()
3008 }
3009 err := es.body.Close()
3010 return es.condfn(err)
3011 }
3012
3013
3014 func (es *bodyEOFSignal) condfn(err error) error {
3015 if es.fn == nil {
3016 return err
3017 }
3018 err = es.fn(err)
3019 es.fn = nil
3020 return err
3021 }
3022
3023
3024
3025 type gzipReader struct {
3026 _ incomparable
3027 body *bodyEOFSignal
3028 zr *gzip.Reader
3029 zerr error
3030 }
3031
3032 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3033 if gz.zr == nil {
3034 if gz.zerr == nil {
3035 gz.zr, gz.zerr = gzip.NewReader(gz.body)
3036 }
3037 if gz.zerr != nil {
3038 return 0, gz.zerr
3039 }
3040 }
3041
3042 gz.body.mu.Lock()
3043 if gz.body.closed {
3044 err = errReadOnClosedResBody
3045 }
3046 gz.body.mu.Unlock()
3047
3048 if err != nil {
3049 return 0, err
3050 }
3051 return gz.zr.Read(p)
3052 }
3053
3054 func (gz *gzipReader) Close() error {
3055 return gz.body.Close()
3056 }
3057
3058 type tlsHandshakeTimeoutError struct{}
3059
3060 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3061 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3062 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3063
3064
3065
3066
3067 type fakeLocker struct{}
3068
3069 func (fakeLocker) Lock() {}
3070 func (fakeLocker) Unlock() {}
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3086 if cfg == nil {
3087 return &tls.Config{}
3088 }
3089 return cfg.Clone()
3090 }
3091
3092 type connLRU struct {
3093 ll *list.List
3094 m map[*persistConn]*list.Element
3095 }
3096
3097
3098 func (cl *connLRU) add(pc *persistConn) {
3099 if cl.ll == nil {
3100 cl.ll = list.New()
3101 cl.m = make(map[*persistConn]*list.Element)
3102 }
3103 ele := cl.ll.PushFront(pc)
3104 if _, ok := cl.m[pc]; ok {
3105 panic("persistConn was already in LRU")
3106 }
3107 cl.m[pc] = ele
3108 }
3109
3110 func (cl *connLRU) removeOldest() *persistConn {
3111 ele := cl.ll.Back()
3112 pc := ele.Value.(*persistConn)
3113 cl.ll.Remove(ele)
3114 delete(cl.m, pc)
3115 return pc
3116 }
3117
3118
3119 func (cl *connLRU) remove(pc *persistConn) {
3120 if ele, ok := cl.m[pc]; ok {
3121 cl.ll.Remove(ele)
3122 delete(cl.m, pc)
3123 }
3124 }
3125
3126
3127 func (cl *connLRU) len() int {
3128 return len(cl.m)
3129 }
3130
View as plain text