1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "math/rand/v2"
25 "reflect"
26 "runtime"
27 "sort"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32 )
33
34 var (
35 driversMu sync.RWMutex
36 drivers = make(map[string]driver.Driver)
37 )
38
39
40 var nowFunc = time.Now
41
42
43
44
45 func Register(name string, driver driver.Driver) {
46 driversMu.Lock()
47 defer driversMu.Unlock()
48 if driver == nil {
49 panic("sql: Register driver is nil")
50 }
51 if _, dup := drivers[name]; dup {
52 panic("sql: Register called twice for driver " + name)
53 }
54 drivers[name] = driver
55 }
56
57 func unregisterAllDrivers() {
58 driversMu.Lock()
59 defer driversMu.Unlock()
60
61 drivers = make(map[string]driver.Driver)
62 }
63
64
65 func Drivers() []string {
66 driversMu.RLock()
67 defer driversMu.RUnlock()
68 list := make([]string, 0, len(drivers))
69 for name := range drivers {
70 list = append(list, name)
71 }
72 sort.Strings(list)
73 return list
74 }
75
76
77
78
79
80
81
82 type NamedArg struct {
83 _NamedFieldsRequired struct{}
84
85
86
87
88
89
90
91 Name string
92
93
94
95
96 Value any
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111 func Named(name string, value any) NamedArg {
112
113
114
115
116 return NamedArg{Name: name, Value: value}
117 }
118
119
120 type IsolationLevel int
121
122
123
124
125
126 const (
127 LevelDefault IsolationLevel = iota
128 LevelReadUncommitted
129 LevelReadCommitted
130 LevelWriteCommitted
131 LevelRepeatableRead
132 LevelSnapshot
133 LevelSerializable
134 LevelLinearizable
135 )
136
137
138 func (i IsolationLevel) String() string {
139 switch i {
140 case LevelDefault:
141 return "Default"
142 case LevelReadUncommitted:
143 return "Read Uncommitted"
144 case LevelReadCommitted:
145 return "Read Committed"
146 case LevelWriteCommitted:
147 return "Write Committed"
148 case LevelRepeatableRead:
149 return "Repeatable Read"
150 case LevelSnapshot:
151 return "Snapshot"
152 case LevelSerializable:
153 return "Serializable"
154 case LevelLinearizable:
155 return "Linearizable"
156 default:
157 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")"
158 }
159 }
160
161 var _ fmt.Stringer = LevelDefault
162
163
164 type TxOptions struct {
165
166
167 Isolation IsolationLevel
168 ReadOnly bool
169 }
170
171
172
173
174 type RawBytes []byte
175
176
177
178
179
180
181
182
183
184
185
186
187
188 type NullString struct {
189 String string
190 Valid bool
191 }
192
193
194 func (ns *NullString) Scan(value any) error {
195 if value == nil {
196 ns.String, ns.Valid = "", false
197 return nil
198 }
199 ns.Valid = true
200 return convertAssign(&ns.String, value)
201 }
202
203
204 func (ns NullString) Value() (driver.Value, error) {
205 if !ns.Valid {
206 return nil, nil
207 }
208 return ns.String, nil
209 }
210
211
212
213
214 type NullInt64 struct {
215 Int64 int64
216 Valid bool
217 }
218
219
220 func (n *NullInt64) Scan(value any) error {
221 if value == nil {
222 n.Int64, n.Valid = 0, false
223 return nil
224 }
225 n.Valid = true
226 return convertAssign(&n.Int64, value)
227 }
228
229
230 func (n NullInt64) Value() (driver.Value, error) {
231 if !n.Valid {
232 return nil, nil
233 }
234 return n.Int64, nil
235 }
236
237
238
239
240 type NullInt32 struct {
241 Int32 int32
242 Valid bool
243 }
244
245
246 func (n *NullInt32) Scan(value any) error {
247 if value == nil {
248 n.Int32, n.Valid = 0, false
249 return nil
250 }
251 n.Valid = true
252 return convertAssign(&n.Int32, value)
253 }
254
255
256 func (n NullInt32) Value() (driver.Value, error) {
257 if !n.Valid {
258 return nil, nil
259 }
260 return int64(n.Int32), nil
261 }
262
263
264
265
266 type NullInt16 struct {
267 Int16 int16
268 Valid bool
269 }
270
271
272 func (n *NullInt16) Scan(value any) error {
273 if value == nil {
274 n.Int16, n.Valid = 0, false
275 return nil
276 }
277 err := convertAssign(&n.Int16, value)
278 n.Valid = err == nil
279 return err
280 }
281
282
283 func (n NullInt16) Value() (driver.Value, error) {
284 if !n.Valid {
285 return nil, nil
286 }
287 return int64(n.Int16), nil
288 }
289
290
291
292
293 type NullByte struct {
294 Byte byte
295 Valid bool
296 }
297
298
299 func (n *NullByte) Scan(value any) error {
300 if value == nil {
301 n.Byte, n.Valid = 0, false
302 return nil
303 }
304 err := convertAssign(&n.Byte, value)
305 n.Valid = err == nil
306 return err
307 }
308
309
310 func (n NullByte) Value() (driver.Value, error) {
311 if !n.Valid {
312 return nil, nil
313 }
314 return int64(n.Byte), nil
315 }
316
317
318
319
320 type NullFloat64 struct {
321 Float64 float64
322 Valid bool
323 }
324
325
326 func (n *NullFloat64) Scan(value any) error {
327 if value == nil {
328 n.Float64, n.Valid = 0, false
329 return nil
330 }
331 n.Valid = true
332 return convertAssign(&n.Float64, value)
333 }
334
335
336 func (n NullFloat64) Value() (driver.Value, error) {
337 if !n.Valid {
338 return nil, nil
339 }
340 return n.Float64, nil
341 }
342
343
344
345
346 type NullBool struct {
347 Bool bool
348 Valid bool
349 }
350
351
352 func (n *NullBool) Scan(value any) error {
353 if value == nil {
354 n.Bool, n.Valid = false, false
355 return nil
356 }
357 n.Valid = true
358 return convertAssign(&n.Bool, value)
359 }
360
361
362 func (n NullBool) Value() (driver.Value, error) {
363 if !n.Valid {
364 return nil, nil
365 }
366 return n.Bool, nil
367 }
368
369
370
371
372 type NullTime struct {
373 Time time.Time
374 Valid bool
375 }
376
377
378 func (n *NullTime) Scan(value any) error {
379 if value == nil {
380 n.Time, n.Valid = time.Time{}, false
381 return nil
382 }
383 n.Valid = true
384 return convertAssign(&n.Time, value)
385 }
386
387
388 func (n NullTime) Value() (driver.Value, error) {
389 if !n.Valid {
390 return nil, nil
391 }
392 return n.Time, nil
393 }
394
395
396
397
398
399
400
401
402
403
404
405
406
407 type Null[T any] struct {
408 V T
409 Valid bool
410 }
411
412 func (n *Null[T]) Scan(value any) error {
413 if value == nil {
414 n.V, n.Valid = *new(T), false
415 return nil
416 }
417 n.Valid = true
418 return convertAssign(&n.V, value)
419 }
420
421 func (n Null[T]) Value() (driver.Value, error) {
422 if !n.Valid {
423 return nil, nil
424 }
425 return n.V, nil
426 }
427
428
429 type Scanner interface {
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448 Scan(src any) error
449 }
450
451
452
453
454
455
456
457
458
459 type Out struct {
460 _NamedFieldsRequired struct{}
461
462
463
464 Dest any
465
466
467
468
469 In bool
470 }
471
472
473
474
475 var ErrNoRows = errors.New("sql: no rows in result set")
476
477
478
479
480
481
482
483
484
485
486
487
488
489 type DB struct {
490
491 waitDuration atomic.Int64
492
493 connector driver.Connector
494
495
496
497 numClosed atomic.Uint64
498
499 mu sync.Mutex
500 freeConn []*driverConn
501 connRequests connRequestSet
502 numOpen int
503
504
505
506
507
508 openerCh chan struct{}
509 closed bool
510 dep map[finalCloser]depSet
511 lastPut map[*driverConn]string
512 maxIdleCount int
513 maxOpen int
514 maxLifetime time.Duration
515 maxIdleTime time.Duration
516 cleanerCh chan struct{}
517 waitCount int64
518 maxIdleClosed int64
519 maxIdleTimeClosed int64
520 maxLifetimeClosed int64
521
522 stop func()
523 }
524
525
526 type connReuseStrategy uint8
527
528 const (
529
530 alwaysNewConn connReuseStrategy = iota
531
532
533
534 cachedOrNewConn
535 )
536
537
538
539
540
541 type driverConn struct {
542 db *DB
543 createdAt time.Time
544
545 sync.Mutex
546 ci driver.Conn
547 needReset bool
548 closed bool
549 finalClosed bool
550 openStmt map[*driverStmt]bool
551
552
553 inUse bool
554 returnedAt time.Time
555 onPut []func()
556 dbmuClosed bool
557 }
558
559 func (dc *driverConn) releaseConn(err error) {
560 dc.db.putConn(dc, err, true)
561 }
562
563 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
564 dc.Lock()
565 defer dc.Unlock()
566 delete(dc.openStmt, ds)
567 }
568
569 func (dc *driverConn) expired(timeout time.Duration) bool {
570 if timeout <= 0 {
571 return false
572 }
573 return dc.createdAt.Add(timeout).Before(nowFunc())
574 }
575
576
577
578 func (dc *driverConn) resetSession(ctx context.Context) error {
579 dc.Lock()
580 defer dc.Unlock()
581
582 if !dc.needReset {
583 return nil
584 }
585 if cr, ok := dc.ci.(driver.SessionResetter); ok {
586 return cr.ResetSession(ctx)
587 }
588 return nil
589 }
590
591
592
593 func (dc *driverConn) validateConnection(needsReset bool) bool {
594 dc.Lock()
595 defer dc.Unlock()
596
597 if needsReset {
598 dc.needReset = true
599 }
600 if cv, ok := dc.ci.(driver.Validator); ok {
601 return cv.IsValid()
602 }
603 return true
604 }
605
606
607
608 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
609 si, err := ctxDriverPrepare(ctx, dc.ci, query)
610 if err != nil {
611 return nil, err
612 }
613 ds := &driverStmt{Locker: dc, si: si}
614
615
616 if cg != nil {
617 return ds, nil
618 }
619
620
621
622
623
624 if dc.openStmt == nil {
625 dc.openStmt = make(map[*driverStmt]bool)
626 }
627 dc.openStmt[ds] = true
628 return ds, nil
629 }
630
631
632 func (dc *driverConn) closeDBLocked() func() error {
633 dc.Lock()
634 defer dc.Unlock()
635 if dc.closed {
636 return func() error { return errors.New("sql: duplicate driverConn close") }
637 }
638 dc.closed = true
639 return dc.db.removeDepLocked(dc, dc)
640 }
641
642 func (dc *driverConn) Close() error {
643 dc.Lock()
644 if dc.closed {
645 dc.Unlock()
646 return errors.New("sql: duplicate driverConn close")
647 }
648 dc.closed = true
649 dc.Unlock()
650
651
652 dc.db.mu.Lock()
653 dc.dbmuClosed = true
654 fn := dc.db.removeDepLocked(dc, dc)
655 dc.db.mu.Unlock()
656 return fn()
657 }
658
659 func (dc *driverConn) finalClose() error {
660 var err error
661
662
663
664 var openStmt []*driverStmt
665 withLock(dc, func() {
666 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
667 for ds := range dc.openStmt {
668 openStmt = append(openStmt, ds)
669 }
670 dc.openStmt = nil
671 })
672 for _, ds := range openStmt {
673 ds.Close()
674 }
675 withLock(dc, func() {
676 dc.finalClosed = true
677 err = dc.ci.Close()
678 dc.ci = nil
679 })
680
681 dc.db.mu.Lock()
682 dc.db.numOpen--
683 dc.db.maybeOpenNewConnections()
684 dc.db.mu.Unlock()
685
686 dc.db.numClosed.Add(1)
687 return err
688 }
689
690
691
692
693 type driverStmt struct {
694 sync.Locker
695 si driver.Stmt
696 closed bool
697 closeErr error
698 }
699
700
701
702 func (ds *driverStmt) Close() error {
703 ds.Lock()
704 defer ds.Unlock()
705 if ds.closed {
706 return ds.closeErr
707 }
708 ds.closed = true
709 ds.closeErr = ds.si.Close()
710 return ds.closeErr
711 }
712
713
714 type depSet map[any]bool
715
716
717
718 type finalCloser interface {
719
720
721 finalClose() error
722 }
723
724
725
726 func (db *DB) addDep(x finalCloser, dep any) {
727 db.mu.Lock()
728 defer db.mu.Unlock()
729 db.addDepLocked(x, dep)
730 }
731
732 func (db *DB) addDepLocked(x finalCloser, dep any) {
733 if db.dep == nil {
734 db.dep = make(map[finalCloser]depSet)
735 }
736 xdep := db.dep[x]
737 if xdep == nil {
738 xdep = make(depSet)
739 db.dep[x] = xdep
740 }
741 xdep[dep] = true
742 }
743
744
745
746
747
748 func (db *DB) removeDep(x finalCloser, dep any) error {
749 db.mu.Lock()
750 fn := db.removeDepLocked(x, dep)
751 db.mu.Unlock()
752 return fn()
753 }
754
755 func (db *DB) removeDepLocked(x finalCloser, dep any) func() error {
756 xdep, ok := db.dep[x]
757 if !ok {
758 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
759 }
760
761 l0 := len(xdep)
762 delete(xdep, dep)
763
764 switch len(xdep) {
765 case l0:
766
767 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
768 case 0:
769
770 delete(db.dep, x)
771 return x.finalClose
772 default:
773
774 return func() error { return nil }
775 }
776 }
777
778
779
780
781
782
783 var connectionRequestQueueSize = 1000000
784
785 type dsnConnector struct {
786 dsn string
787 driver driver.Driver
788 }
789
790 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
791 return t.driver.Open(t.dsn)
792 }
793
794 func (t dsnConnector) Driver() driver.Driver {
795 return t.driver
796 }
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814 func OpenDB(c driver.Connector) *DB {
815 ctx, cancel := context.WithCancel(context.Background())
816 db := &DB{
817 connector: c,
818 openerCh: make(chan struct{}, connectionRequestQueueSize),
819 lastPut: make(map[*driverConn]string),
820 stop: cancel,
821 }
822
823 go db.connectionOpener(ctx)
824
825 return db
826 }
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845 func Open(driverName, dataSourceName string) (*DB, error) {
846 driversMu.RLock()
847 driveri, ok := drivers[driverName]
848 driversMu.RUnlock()
849 if !ok {
850 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
851 }
852
853 if driverCtx, ok := driveri.(driver.DriverContext); ok {
854 connector, err := driverCtx.OpenConnector(dataSourceName)
855 if err != nil {
856 return nil, err
857 }
858 return OpenDB(connector), nil
859 }
860
861 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
862 }
863
864 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
865 var err error
866 if pinger, ok := dc.ci.(driver.Pinger); ok {
867 withLock(dc, func() {
868 err = pinger.Ping(ctx)
869 })
870 }
871 release(err)
872 return err
873 }
874
875
876
877 func (db *DB) PingContext(ctx context.Context) error {
878 var dc *driverConn
879 var err error
880
881 err = db.retry(func(strategy connReuseStrategy) error {
882 dc, err = db.conn(ctx, strategy)
883 return err
884 })
885
886 if err != nil {
887 return err
888 }
889
890 return db.pingDC(ctx, dc, dc.releaseConn)
891 }
892
893
894
895
896
897
898 func (db *DB) Ping() error {
899 return db.PingContext(context.Background())
900 }
901
902
903
904
905
906
907
908 func (db *DB) Close() error {
909 db.mu.Lock()
910 if db.closed {
911 db.mu.Unlock()
912 return nil
913 }
914 if db.cleanerCh != nil {
915 close(db.cleanerCh)
916 }
917 var err error
918 fns := make([]func() error, 0, len(db.freeConn))
919 for _, dc := range db.freeConn {
920 fns = append(fns, dc.closeDBLocked())
921 }
922 db.freeConn = nil
923 db.closed = true
924 db.connRequests.CloseAndRemoveAll()
925 db.mu.Unlock()
926 for _, fn := range fns {
927 err1 := fn()
928 if err1 != nil {
929 err = err1
930 }
931 }
932 db.stop()
933 if c, ok := db.connector.(io.Closer); ok {
934 err1 := c.Close()
935 if err1 != nil {
936 err = err1
937 }
938 }
939 return err
940 }
941
942 const defaultMaxIdleConns = 2
943
944 func (db *DB) maxIdleConnsLocked() int {
945 n := db.maxIdleCount
946 switch {
947 case n == 0:
948
949 return defaultMaxIdleConns
950 case n < 0:
951 return 0
952 default:
953 return n
954 }
955 }
956
957 func (db *DB) shortestIdleTimeLocked() time.Duration {
958 if db.maxIdleTime <= 0 {
959 return db.maxLifetime
960 }
961 if db.maxLifetime <= 0 {
962 return db.maxIdleTime
963 }
964 return min(db.maxIdleTime, db.maxLifetime)
965 }
966
967
968
969
970
971
972
973
974
975
976
977 func (db *DB) SetMaxIdleConns(n int) {
978 db.mu.Lock()
979 if n > 0 {
980 db.maxIdleCount = n
981 } else {
982
983 db.maxIdleCount = -1
984 }
985
986 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
987 db.maxIdleCount = db.maxOpen
988 }
989 var closing []*driverConn
990 idleCount := len(db.freeConn)
991 maxIdle := db.maxIdleConnsLocked()
992 if idleCount > maxIdle {
993 closing = db.freeConn[maxIdle:]
994 db.freeConn = db.freeConn[:maxIdle]
995 }
996 db.maxIdleClosed += int64(len(closing))
997 db.mu.Unlock()
998 for _, c := range closing {
999 c.Close()
1000 }
1001 }
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011 func (db *DB) SetMaxOpenConns(n int) {
1012 db.mu.Lock()
1013 db.maxOpen = n
1014 if n < 0 {
1015 db.maxOpen = 0
1016 }
1017 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
1018 db.mu.Unlock()
1019 if syncMaxIdle {
1020 db.SetMaxIdleConns(n)
1021 }
1022 }
1023
1024
1025
1026
1027
1028
1029 func (db *DB) SetConnMaxLifetime(d time.Duration) {
1030 if d < 0 {
1031 d = 0
1032 }
1033 db.mu.Lock()
1034
1035 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
1036 select {
1037 case db.cleanerCh <- struct{}{}:
1038 default:
1039 }
1040 }
1041 db.maxLifetime = d
1042 db.startCleanerLocked()
1043 db.mu.Unlock()
1044 }
1045
1046
1047
1048
1049
1050
1051 func (db *DB) SetConnMaxIdleTime(d time.Duration) {
1052 if d < 0 {
1053 d = 0
1054 }
1055 db.mu.Lock()
1056 defer db.mu.Unlock()
1057
1058
1059 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
1060 select {
1061 case db.cleanerCh <- struct{}{}:
1062 default:
1063 }
1064 }
1065 db.maxIdleTime = d
1066 db.startCleanerLocked()
1067 }
1068
1069
1070 func (db *DB) startCleanerLocked() {
1071 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
1072 db.cleanerCh = make(chan struct{}, 1)
1073 go db.connectionCleaner(db.shortestIdleTimeLocked())
1074 }
1075 }
1076
1077 func (db *DB) connectionCleaner(d time.Duration) {
1078 const minInterval = time.Second
1079
1080 if d < minInterval {
1081 d = minInterval
1082 }
1083 t := time.NewTimer(d)
1084
1085 for {
1086 select {
1087 case <-t.C:
1088 case <-db.cleanerCh:
1089 }
1090
1091 db.mu.Lock()
1092
1093 d = db.shortestIdleTimeLocked()
1094 if db.closed || db.numOpen == 0 || d <= 0 {
1095 db.cleanerCh = nil
1096 db.mu.Unlock()
1097 return
1098 }
1099
1100 d, closing := db.connectionCleanerRunLocked(d)
1101 db.mu.Unlock()
1102 for _, c := range closing {
1103 c.Close()
1104 }
1105
1106 if d < minInterval {
1107 d = minInterval
1108 }
1109
1110 if !t.Stop() {
1111 select {
1112 case <-t.C:
1113 default:
1114 }
1115 }
1116 t.Reset(d)
1117 }
1118 }
1119
1120
1121
1122
1123 func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
1124 var idleClosing int64
1125 var closing []*driverConn
1126 if db.maxIdleTime > 0 {
1127
1128
1129 idleSince := nowFunc().Add(-db.maxIdleTime)
1130 last := len(db.freeConn) - 1
1131 for i := last; i >= 0; i-- {
1132 c := db.freeConn[i]
1133 if c.returnedAt.Before(idleSince) {
1134 i++
1135 closing = db.freeConn[:i:i]
1136 db.freeConn = db.freeConn[i:]
1137 idleClosing = int64(len(closing))
1138 db.maxIdleTimeClosed += idleClosing
1139 break
1140 }
1141 }
1142
1143 if len(db.freeConn) > 0 {
1144 c := db.freeConn[0]
1145 if d2 := c.returnedAt.Sub(idleSince); d2 < d {
1146
1147
1148 d = d2
1149 }
1150 }
1151 }
1152
1153 if db.maxLifetime > 0 {
1154 expiredSince := nowFunc().Add(-db.maxLifetime)
1155 for i := 0; i < len(db.freeConn); i++ {
1156 c := db.freeConn[i]
1157 if c.createdAt.Before(expiredSince) {
1158 closing = append(closing, c)
1159
1160 last := len(db.freeConn) - 1
1161
1162
1163 copy(db.freeConn[i:], db.freeConn[i+1:])
1164 db.freeConn[last] = nil
1165 db.freeConn = db.freeConn[:last]
1166 i--
1167 } else if d2 := c.createdAt.Sub(expiredSince); d2 < d {
1168
1169
1170 d = d2
1171 }
1172 }
1173 db.maxLifetimeClosed += int64(len(closing)) - idleClosing
1174 }
1175
1176 return d, closing
1177 }
1178
1179
1180 type DBStats struct {
1181 MaxOpenConnections int
1182
1183
1184 OpenConnections int
1185 InUse int
1186 Idle int
1187
1188
1189 WaitCount int64
1190 WaitDuration time.Duration
1191 MaxIdleClosed int64
1192 MaxIdleTimeClosed int64
1193 MaxLifetimeClosed int64
1194 }
1195
1196
1197 func (db *DB) Stats() DBStats {
1198 wait := db.waitDuration.Load()
1199
1200 db.mu.Lock()
1201 defer db.mu.Unlock()
1202
1203 stats := DBStats{
1204 MaxOpenConnections: db.maxOpen,
1205
1206 Idle: len(db.freeConn),
1207 OpenConnections: db.numOpen,
1208 InUse: db.numOpen - len(db.freeConn),
1209
1210 WaitCount: db.waitCount,
1211 WaitDuration: time.Duration(wait),
1212 MaxIdleClosed: db.maxIdleClosed,
1213 MaxIdleTimeClosed: db.maxIdleTimeClosed,
1214 MaxLifetimeClosed: db.maxLifetimeClosed,
1215 }
1216 return stats
1217 }
1218
1219
1220
1221
1222 func (db *DB) maybeOpenNewConnections() {
1223 numRequests := db.connRequests.Len()
1224 if db.maxOpen > 0 {
1225 numCanOpen := db.maxOpen - db.numOpen
1226 if numRequests > numCanOpen {
1227 numRequests = numCanOpen
1228 }
1229 }
1230 for numRequests > 0 {
1231 db.numOpen++
1232 numRequests--
1233 if db.closed {
1234 return
1235 }
1236 db.openerCh <- struct{}{}
1237 }
1238 }
1239
1240
1241 func (db *DB) connectionOpener(ctx context.Context) {
1242 for {
1243 select {
1244 case <-ctx.Done():
1245 return
1246 case <-db.openerCh:
1247 db.openNewConnection(ctx)
1248 }
1249 }
1250 }
1251
1252
1253 func (db *DB) openNewConnection(ctx context.Context) {
1254
1255
1256
1257 ci, err := db.connector.Connect(ctx)
1258 db.mu.Lock()
1259 defer db.mu.Unlock()
1260 if db.closed {
1261 if err == nil {
1262 ci.Close()
1263 }
1264 db.numOpen--
1265 return
1266 }
1267 if err != nil {
1268 db.numOpen--
1269 db.putConnDBLocked(nil, err)
1270 db.maybeOpenNewConnections()
1271 return
1272 }
1273 dc := &driverConn{
1274 db: db,
1275 createdAt: nowFunc(),
1276 returnedAt: nowFunc(),
1277 ci: ci,
1278 }
1279 if db.putConnDBLocked(dc, err) {
1280 db.addDepLocked(dc, dc)
1281 } else {
1282 db.numOpen--
1283 ci.Close()
1284 }
1285 }
1286
1287
1288
1289
1290 type connRequest struct {
1291 conn *driverConn
1292 err error
1293 }
1294
1295 var errDBClosed = errors.New("sql: database is closed")
1296
1297
1298 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1299 db.mu.Lock()
1300 if db.closed {
1301 db.mu.Unlock()
1302 return nil, errDBClosed
1303 }
1304
1305 select {
1306 default:
1307 case <-ctx.Done():
1308 db.mu.Unlock()
1309 return nil, ctx.Err()
1310 }
1311 lifetime := db.maxLifetime
1312
1313
1314 last := len(db.freeConn) - 1
1315 if strategy == cachedOrNewConn && last >= 0 {
1316
1317
1318 conn := db.freeConn[last]
1319 db.freeConn = db.freeConn[:last]
1320 conn.inUse = true
1321 if conn.expired(lifetime) {
1322 db.maxLifetimeClosed++
1323 db.mu.Unlock()
1324 conn.Close()
1325 return nil, driver.ErrBadConn
1326 }
1327 db.mu.Unlock()
1328
1329
1330 if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1331 conn.Close()
1332 return nil, err
1333 }
1334
1335 return conn, nil
1336 }
1337
1338
1339
1340 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1341
1342
1343 req := make(chan connRequest, 1)
1344 delHandle := db.connRequests.Add(req)
1345 db.waitCount++
1346 db.mu.Unlock()
1347
1348 waitStart := nowFunc()
1349
1350
1351 select {
1352 case <-ctx.Done():
1353
1354
1355 db.mu.Lock()
1356 deleted := db.connRequests.Delete(delHandle)
1357 db.mu.Unlock()
1358
1359 db.waitDuration.Add(int64(time.Since(waitStart)))
1360
1361
1362
1363 if !deleted {
1364
1365
1366
1367
1368
1369
1370 select {
1371 default:
1372 case ret, ok := <-req:
1373 if ok && ret.conn != nil {
1374 db.putConn(ret.conn, ret.err, false)
1375 }
1376 }
1377 }
1378 return nil, ctx.Err()
1379 case ret, ok := <-req:
1380 db.waitDuration.Add(int64(time.Since(waitStart)))
1381
1382 if !ok {
1383 return nil, errDBClosed
1384 }
1385
1386
1387
1388
1389
1390
1391 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
1392 db.mu.Lock()
1393 db.maxLifetimeClosed++
1394 db.mu.Unlock()
1395 ret.conn.Close()
1396 return nil, driver.ErrBadConn
1397 }
1398 if ret.conn == nil {
1399 return nil, ret.err
1400 }
1401
1402
1403 if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1404 ret.conn.Close()
1405 return nil, err
1406 }
1407 return ret.conn, ret.err
1408 }
1409 }
1410
1411 db.numOpen++
1412 db.mu.Unlock()
1413 ci, err := db.connector.Connect(ctx)
1414 if err != nil {
1415 db.mu.Lock()
1416 db.numOpen--
1417 db.maybeOpenNewConnections()
1418 db.mu.Unlock()
1419 return nil, err
1420 }
1421 db.mu.Lock()
1422 dc := &driverConn{
1423 db: db,
1424 createdAt: nowFunc(),
1425 returnedAt: nowFunc(),
1426 ci: ci,
1427 inUse: true,
1428 }
1429 db.addDepLocked(dc, dc)
1430 db.mu.Unlock()
1431 return dc, nil
1432 }
1433
1434
1435 var putConnHook func(*DB, *driverConn)
1436
1437
1438
1439
1440 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1441 db.mu.Lock()
1442 defer db.mu.Unlock()
1443 if c.inUse {
1444 c.onPut = append(c.onPut, func() {
1445 ds.Close()
1446 })
1447 } else {
1448 c.Lock()
1449 fc := c.finalClosed
1450 c.Unlock()
1451 if !fc {
1452 ds.Close()
1453 }
1454 }
1455 }
1456
1457
1458
1459 const debugGetPut = false
1460
1461
1462
1463 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1464 if !errors.Is(err, driver.ErrBadConn) {
1465 if !dc.validateConnection(resetSession) {
1466 err = driver.ErrBadConn
1467 }
1468 }
1469 db.mu.Lock()
1470 if !dc.inUse {
1471 db.mu.Unlock()
1472 if debugGetPut {
1473 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1474 }
1475 panic("sql: connection returned that was never out")
1476 }
1477
1478 if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
1479 db.maxLifetimeClosed++
1480 err = driver.ErrBadConn
1481 }
1482 if debugGetPut {
1483 db.lastPut[dc] = stack()
1484 }
1485 dc.inUse = false
1486 dc.returnedAt = nowFunc()
1487
1488 for _, fn := range dc.onPut {
1489 fn()
1490 }
1491 dc.onPut = nil
1492
1493 if errors.Is(err, driver.ErrBadConn) {
1494
1495
1496
1497
1498 db.maybeOpenNewConnections()
1499 db.mu.Unlock()
1500 dc.Close()
1501 return
1502 }
1503 if putConnHook != nil {
1504 putConnHook(db, dc)
1505 }
1506 added := db.putConnDBLocked(dc, nil)
1507 db.mu.Unlock()
1508
1509 if !added {
1510 dc.Close()
1511 return
1512 }
1513 }
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1525 if db.closed {
1526 return false
1527 }
1528 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1529 return false
1530 }
1531 if req, ok := db.connRequests.TakeRandom(); ok {
1532 if err == nil {
1533 dc.inUse = true
1534 }
1535 req <- connRequest{
1536 conn: dc,
1537 err: err,
1538 }
1539 return true
1540 } else if err == nil && !db.closed {
1541 if db.maxIdleConnsLocked() > len(db.freeConn) {
1542 db.freeConn = append(db.freeConn, dc)
1543 db.startCleanerLocked()
1544 return true
1545 }
1546 db.maxIdleClosed++
1547 }
1548 return false
1549 }
1550
1551
1552
1553
1554 const maxBadConnRetries = 2
1555
1556 func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
1557 for i := int64(0); i < maxBadConnRetries; i++ {
1558 err := fn(cachedOrNewConn)
1559
1560 if err == nil || !errors.Is(err, driver.ErrBadConn) {
1561 return err
1562 }
1563 }
1564
1565 return fn(alwaysNewConn)
1566 }
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1577 var stmt *Stmt
1578 var err error
1579
1580 err = db.retry(func(strategy connReuseStrategy) error {
1581 stmt, err = db.prepare(ctx, query, strategy)
1582 return err
1583 })
1584
1585 return stmt, err
1586 }
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596 func (db *DB) Prepare(query string) (*Stmt, error) {
1597 return db.PrepareContext(context.Background(), query)
1598 }
1599
1600 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1601
1602
1603
1604
1605
1606
1607 dc, err := db.conn(ctx, strategy)
1608 if err != nil {
1609 return nil, err
1610 }
1611 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1612 }
1613
1614
1615
1616
1617 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1618 var ds *driverStmt
1619 var err error
1620 defer func() {
1621 release(err)
1622 }()
1623 withLock(dc, func() {
1624 ds, err = dc.prepareLocked(ctx, cg, query)
1625 })
1626 if err != nil {
1627 return nil, err
1628 }
1629 stmt := &Stmt{
1630 db: db,
1631 query: query,
1632 cg: cg,
1633 cgds: ds,
1634 }
1635
1636
1637
1638
1639 if cg == nil {
1640 stmt.css = []connStmt{{dc, ds}}
1641 stmt.lastNumClosed = db.numClosed.Load()
1642 db.addDep(stmt, stmt)
1643 }
1644 return stmt, nil
1645 }
1646
1647
1648
1649 func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
1650 var res Result
1651 var err error
1652
1653 err = db.retry(func(strategy connReuseStrategy) error {
1654 res, err = db.exec(ctx, query, args, strategy)
1655 return err
1656 })
1657
1658 return res, err
1659 }
1660
1661
1662
1663
1664
1665
1666 func (db *DB) Exec(query string, args ...any) (Result, error) {
1667 return db.ExecContext(context.Background(), query, args...)
1668 }
1669
1670 func (db *DB) exec(ctx context.Context, query string, args []any, strategy connReuseStrategy) (Result, error) {
1671 dc, err := db.conn(ctx, strategy)
1672 if err != nil {
1673 return nil, err
1674 }
1675 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1676 }
1677
1678 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) {
1679 defer func() {
1680 release(err)
1681 }()
1682 execerCtx, ok := dc.ci.(driver.ExecerContext)
1683 var execer driver.Execer
1684 if !ok {
1685 execer, ok = dc.ci.(driver.Execer)
1686 }
1687 if ok {
1688 var nvdargs []driver.NamedValue
1689 var resi driver.Result
1690 withLock(dc, func() {
1691 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1692 if err != nil {
1693 return
1694 }
1695 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1696 })
1697 if err != driver.ErrSkip {
1698 if err != nil {
1699 return nil, err
1700 }
1701 return driverResult{dc, resi}, nil
1702 }
1703 }
1704
1705 var si driver.Stmt
1706 withLock(dc, func() {
1707 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1708 })
1709 if err != nil {
1710 return nil, err
1711 }
1712 ds := &driverStmt{Locker: dc, si: si}
1713 defer ds.Close()
1714 return resultFromStatement(ctx, dc.ci, ds, args...)
1715 }
1716
1717
1718
1719 func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
1720 var rows *Rows
1721 var err error
1722
1723 err = db.retry(func(strategy connReuseStrategy) error {
1724 rows, err = db.query(ctx, query, args, strategy)
1725 return err
1726 })
1727
1728 return rows, err
1729 }
1730
1731
1732
1733
1734
1735
1736 func (db *DB) Query(query string, args ...any) (*Rows, error) {
1737 return db.QueryContext(context.Background(), query, args...)
1738 }
1739
1740 func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
1741 dc, err := db.conn(ctx, strategy)
1742 if err != nil {
1743 return nil, err
1744 }
1745
1746 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1747 }
1748
1749
1750
1751
1752
1753 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) {
1754 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1755 var queryer driver.Queryer
1756 if !ok {
1757 queryer, ok = dc.ci.(driver.Queryer)
1758 }
1759 if ok {
1760 var nvdargs []driver.NamedValue
1761 var rowsi driver.Rows
1762 var err error
1763 withLock(dc, func() {
1764 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1765 if err != nil {
1766 return
1767 }
1768 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1769 })
1770 if err != driver.ErrSkip {
1771 if err != nil {
1772 releaseConn(err)
1773 return nil, err
1774 }
1775
1776
1777 rows := &Rows{
1778 dc: dc,
1779 releaseConn: releaseConn,
1780 rowsi: rowsi,
1781 }
1782 rows.initContextClose(ctx, txctx)
1783 return rows, nil
1784 }
1785 }
1786
1787 var si driver.Stmt
1788 var err error
1789 withLock(dc, func() {
1790 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1791 })
1792 if err != nil {
1793 releaseConn(err)
1794 return nil, err
1795 }
1796
1797 ds := &driverStmt{Locker: dc, si: si}
1798 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1799 if err != nil {
1800 ds.Close()
1801 releaseConn(err)
1802 return nil, err
1803 }
1804
1805
1806
1807 rows := &Rows{
1808 dc: dc,
1809 releaseConn: releaseConn,
1810 rowsi: rowsi,
1811 closeStmt: ds,
1812 }
1813 rows.initContextClose(ctx, txctx)
1814 return rows, nil
1815 }
1816
1817
1818
1819
1820
1821
1822
1823 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
1824 rows, err := db.QueryContext(ctx, query, args...)
1825 return &Row{rows: rows, err: err}
1826 }
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837 func (db *DB) QueryRow(query string, args ...any) *Row {
1838 return db.QueryRowContext(context.Background(), query, args...)
1839 }
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1852 var tx *Tx
1853 var err error
1854
1855 err = db.retry(func(strategy connReuseStrategy) error {
1856 tx, err = db.begin(ctx, opts, strategy)
1857 return err
1858 })
1859
1860 return tx, err
1861 }
1862
1863
1864
1865
1866
1867
1868 func (db *DB) Begin() (*Tx, error) {
1869 return db.BeginTx(context.Background(), nil)
1870 }
1871
1872 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1873 dc, err := db.conn(ctx, strategy)
1874 if err != nil {
1875 return nil, err
1876 }
1877 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1878 }
1879
1880
1881 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1882 var txi driver.Tx
1883 keepConnOnRollback := false
1884 withLock(dc, func() {
1885 _, hasSessionResetter := dc.ci.(driver.SessionResetter)
1886 _, hasConnectionValidator := dc.ci.(driver.Validator)
1887 keepConnOnRollback = hasSessionResetter && hasConnectionValidator
1888 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1889 })
1890 if err != nil {
1891 release(err)
1892 return nil, err
1893 }
1894
1895
1896
1897 ctx, cancel := context.WithCancel(ctx)
1898 tx = &Tx{
1899 db: db,
1900 dc: dc,
1901 releaseConn: release,
1902 txi: txi,
1903 cancel: cancel,
1904 keepConnOnRollback: keepConnOnRollback,
1905 ctx: ctx,
1906 }
1907 go tx.awaitDone()
1908 return tx, nil
1909 }
1910
1911
1912 func (db *DB) Driver() driver.Driver {
1913 return db.connector.Driver()
1914 }
1915
1916
1917
1918 var ErrConnDone = errors.New("sql: connection is already closed")
1919
1920
1921
1922
1923
1924
1925
1926
1927 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1928 var dc *driverConn
1929 var err error
1930
1931 err = db.retry(func(strategy connReuseStrategy) error {
1932 dc, err = db.conn(ctx, strategy)
1933 return err
1934 })
1935
1936 if err != nil {
1937 return nil, err
1938 }
1939
1940 conn := &Conn{
1941 db: db,
1942 dc: dc,
1943 }
1944 return conn, nil
1945 }
1946
1947 type releaseConn func(error)
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958 type Conn struct {
1959 db *DB
1960
1961
1962
1963
1964 closemu sync.RWMutex
1965
1966
1967
1968 dc *driverConn
1969
1970
1971
1972 done atomic.Bool
1973
1974 releaseConnOnce sync.Once
1975
1976
1977 releaseConnCache releaseConn
1978 }
1979
1980
1981
1982 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1983 if c.done.Load() {
1984 return nil, nil, ErrConnDone
1985 }
1986 c.releaseConnOnce.Do(func() {
1987 c.releaseConnCache = c.closemuRUnlockCondReleaseConn
1988 })
1989 c.closemu.RLock()
1990 return c.dc, c.releaseConnCache, nil
1991 }
1992
1993
1994 func (c *Conn) PingContext(ctx context.Context) error {
1995 dc, release, err := c.grabConn(ctx)
1996 if err != nil {
1997 return err
1998 }
1999 return c.db.pingDC(ctx, dc, release)
2000 }
2001
2002
2003
2004 func (c *Conn) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
2005 dc, release, err := c.grabConn(ctx)
2006 if err != nil {
2007 return nil, err
2008 }
2009 return c.db.execDC(ctx, dc, release, query, args)
2010 }
2011
2012
2013
2014 func (c *Conn) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2015 dc, release, err := c.grabConn(ctx)
2016 if err != nil {
2017 return nil, err
2018 }
2019 return c.db.queryDC(ctx, nil, dc, release, query, args)
2020 }
2021
2022
2023
2024
2025
2026
2027
2028 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2029 rows, err := c.QueryContext(ctx, query, args...)
2030 return &Row{rows: rows, err: err}
2031 }
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2042 dc, release, err := c.grabConn(ctx)
2043 if err != nil {
2044 return nil, err
2045 }
2046 return c.db.prepareDC(ctx, dc, release, c, query)
2047 }
2048
2049
2050
2051
2052
2053
2054 func (c *Conn) Raw(f func(driverConn any) error) (err error) {
2055 var dc *driverConn
2056 var release releaseConn
2057
2058
2059 dc, release, err = c.grabConn(nil)
2060 if err != nil {
2061 return
2062 }
2063 fPanic := true
2064 dc.Mutex.Lock()
2065 defer func() {
2066 dc.Mutex.Unlock()
2067
2068
2069
2070
2071 if fPanic {
2072 err = driver.ErrBadConn
2073 }
2074 release(err)
2075 }()
2076 err = f(dc.ci)
2077 fPanic = false
2078
2079 return
2080 }
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
2093 dc, release, err := c.grabConn(ctx)
2094 if err != nil {
2095 return nil, err
2096 }
2097 return c.db.beginDC(ctx, dc, release, opts)
2098 }
2099
2100
2101
2102 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
2103 c.closemu.RUnlock()
2104 if errors.Is(err, driver.ErrBadConn) {
2105 c.close(err)
2106 }
2107 }
2108
2109 func (c *Conn) txCtx() context.Context {
2110 return nil
2111 }
2112
2113 func (c *Conn) close(err error) error {
2114 if !c.done.CompareAndSwap(false, true) {
2115 return ErrConnDone
2116 }
2117
2118
2119
2120 c.closemu.Lock()
2121 defer c.closemu.Unlock()
2122
2123 c.dc.releaseConn(err)
2124 c.dc = nil
2125 c.db = nil
2126 return err
2127 }
2128
2129
2130
2131
2132
2133
2134 func (c *Conn) Close() error {
2135 return c.close(nil)
2136 }
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148 type Tx struct {
2149 db *DB
2150
2151
2152
2153
2154 closemu sync.RWMutex
2155
2156
2157
2158 dc *driverConn
2159 txi driver.Tx
2160
2161
2162
2163 releaseConn func(error)
2164
2165
2166
2167
2168 done atomic.Bool
2169
2170
2171
2172
2173 keepConnOnRollback bool
2174
2175
2176
2177 stmts struct {
2178 sync.Mutex
2179 v []*Stmt
2180 }
2181
2182
2183 cancel func()
2184
2185
2186 ctx context.Context
2187 }
2188
2189
2190
2191 func (tx *Tx) awaitDone() {
2192
2193
2194 <-tx.ctx.Done()
2195
2196
2197
2198
2199
2200
2201
2202 discardConnection := !tx.keepConnOnRollback
2203 tx.rollback(discardConnection)
2204 }
2205
2206 func (tx *Tx) isDone() bool {
2207 return tx.done.Load()
2208 }
2209
2210
2211
2212 var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
2213
2214
2215
2216
2217 func (tx *Tx) close(err error) {
2218 tx.releaseConn(err)
2219 tx.dc = nil
2220 tx.txi = nil
2221 }
2222
2223
2224
2225 var hookTxGrabConn func()
2226
2227 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
2228 select {
2229 default:
2230 case <-ctx.Done():
2231 return nil, nil, ctx.Err()
2232 }
2233
2234
2235
2236 tx.closemu.RLock()
2237 if tx.isDone() {
2238 tx.closemu.RUnlock()
2239 return nil, nil, ErrTxDone
2240 }
2241 if hookTxGrabConn != nil {
2242 hookTxGrabConn()
2243 }
2244 return tx.dc, tx.closemuRUnlockRelease, nil
2245 }
2246
2247 func (tx *Tx) txCtx() context.Context {
2248 return tx.ctx
2249 }
2250
2251
2252
2253
2254
2255 func (tx *Tx) closemuRUnlockRelease(error) {
2256 tx.closemu.RUnlock()
2257 }
2258
2259
2260 func (tx *Tx) closePrepared() {
2261 tx.stmts.Lock()
2262 defer tx.stmts.Unlock()
2263 for _, stmt := range tx.stmts.v {
2264 stmt.Close()
2265 }
2266 }
2267
2268
2269 func (tx *Tx) Commit() error {
2270
2271
2272
2273 select {
2274 default:
2275 case <-tx.ctx.Done():
2276 if tx.done.Load() {
2277 return ErrTxDone
2278 }
2279 return tx.ctx.Err()
2280 }
2281 if !tx.done.CompareAndSwap(false, true) {
2282 return ErrTxDone
2283 }
2284
2285
2286
2287
2288
2289 tx.cancel()
2290 tx.closemu.Lock()
2291 tx.closemu.Unlock()
2292
2293 var err error
2294 withLock(tx.dc, func() {
2295 err = tx.txi.Commit()
2296 })
2297 if !errors.Is(err, driver.ErrBadConn) {
2298 tx.closePrepared()
2299 }
2300 tx.close(err)
2301 return err
2302 }
2303
2304 var rollbackHook func()
2305
2306
2307
2308 func (tx *Tx) rollback(discardConn bool) error {
2309 if !tx.done.CompareAndSwap(false, true) {
2310 return ErrTxDone
2311 }
2312
2313 if rollbackHook != nil {
2314 rollbackHook()
2315 }
2316
2317
2318
2319
2320
2321 tx.cancel()
2322 tx.closemu.Lock()
2323 tx.closemu.Unlock()
2324
2325 var err error
2326 withLock(tx.dc, func() {
2327 err = tx.txi.Rollback()
2328 })
2329 if !errors.Is(err, driver.ErrBadConn) {
2330 tx.closePrepared()
2331 }
2332 if discardConn {
2333 err = driver.ErrBadConn
2334 }
2335 tx.close(err)
2336 return err
2337 }
2338
2339
2340 func (tx *Tx) Rollback() error {
2341 return tx.rollback(false)
2342 }
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2355 dc, release, err := tx.grabConn(ctx)
2356 if err != nil {
2357 return nil, err
2358 }
2359
2360 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
2361 if err != nil {
2362 return nil, err
2363 }
2364 tx.stmts.Lock()
2365 tx.stmts.v = append(tx.stmts.v, stmt)
2366 tx.stmts.Unlock()
2367 return stmt, nil
2368 }
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379 func (tx *Tx) Prepare(query string) (*Stmt, error) {
2380 return tx.PrepareContext(context.Background(), query)
2381 }
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2400 dc, release, err := tx.grabConn(ctx)
2401 if err != nil {
2402 return &Stmt{stickyErr: err}
2403 }
2404 defer release(nil)
2405
2406 if tx.db != stmt.db {
2407 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2408 }
2409 var si driver.Stmt
2410 var parentStmt *Stmt
2411 stmt.mu.Lock()
2412 if stmt.closed || stmt.cg != nil {
2413
2414
2415
2416
2417
2418
2419 stmt.mu.Unlock()
2420 withLock(dc, func() {
2421 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2422 })
2423 if err != nil {
2424 return &Stmt{stickyErr: err}
2425 }
2426 } else {
2427 stmt.removeClosedStmtLocked()
2428
2429
2430 for _, v := range stmt.css {
2431 if v.dc == dc {
2432 si = v.ds.si
2433 break
2434 }
2435 }
2436
2437 stmt.mu.Unlock()
2438
2439 if si == nil {
2440 var ds *driverStmt
2441 withLock(dc, func() {
2442 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2443 })
2444 if err != nil {
2445 return &Stmt{stickyErr: err}
2446 }
2447 si = ds.si
2448 }
2449 parentStmt = stmt
2450 }
2451
2452 txs := &Stmt{
2453 db: tx.db,
2454 cg: tx,
2455 cgds: &driverStmt{
2456 Locker: dc,
2457 si: si,
2458 },
2459 parentStmt: parentStmt,
2460 query: stmt.query,
2461 }
2462 if parentStmt != nil {
2463 tx.db.addDep(parentStmt, txs)
2464 }
2465 tx.stmts.Lock()
2466 tx.stmts.v = append(tx.stmts.v, txs)
2467 tx.stmts.Unlock()
2468 return txs
2469 }
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2488 return tx.StmtContext(context.Background(), stmt)
2489 }
2490
2491
2492
2493 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
2494 dc, release, err := tx.grabConn(ctx)
2495 if err != nil {
2496 return nil, err
2497 }
2498 return tx.db.execDC(ctx, dc, release, query, args)
2499 }
2500
2501
2502
2503
2504
2505
2506 func (tx *Tx) Exec(query string, args ...any) (Result, error) {
2507 return tx.ExecContext(context.Background(), query, args...)
2508 }
2509
2510
2511 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2512 dc, release, err := tx.grabConn(ctx)
2513 if err != nil {
2514 return nil, err
2515 }
2516
2517 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2518 }
2519
2520
2521
2522
2523
2524 func (tx *Tx) Query(query string, args ...any) (*Rows, error) {
2525 return tx.QueryContext(context.Background(), query, args...)
2526 }
2527
2528
2529
2530
2531
2532
2533
2534 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2535 rows, err := tx.QueryContext(ctx, query, args...)
2536 return &Row{rows: rows, err: err}
2537 }
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548 func (tx *Tx) QueryRow(query string, args ...any) *Row {
2549 return tx.QueryRowContext(context.Background(), query, args...)
2550 }
2551
2552
2553 type connStmt struct {
2554 dc *driverConn
2555 ds *driverStmt
2556 }
2557
2558
2559
2560 type stmtConnGrabber interface {
2561
2562
2563 grabConn(context.Context) (*driverConn, releaseConn, error)
2564
2565
2566
2567
2568 txCtx() context.Context
2569 }
2570
2571 var (
2572 _ stmtConnGrabber = &Tx{}
2573 _ stmtConnGrabber = &Conn{}
2574 )
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585 type Stmt struct {
2586
2587 db *DB
2588 query string
2589 stickyErr error
2590
2591 closemu sync.RWMutex
2592
2593
2594
2595
2596
2597
2598 cg stmtConnGrabber
2599 cgds *driverStmt
2600
2601
2602
2603
2604
2605
2606
2607 parentStmt *Stmt
2608
2609 mu sync.Mutex
2610 closed bool
2611
2612
2613
2614
2615
2616 css []connStmt
2617
2618
2619
2620 lastNumClosed uint64
2621 }
2622
2623
2624
2625 func (s *Stmt) ExecContext(ctx context.Context, args ...any) (Result, error) {
2626 s.closemu.RLock()
2627 defer s.closemu.RUnlock()
2628
2629 var res Result
2630 err := s.db.retry(func(strategy connReuseStrategy) error {
2631 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2632 if err != nil {
2633 return err
2634 }
2635
2636 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2637 releaseConn(err)
2638 return err
2639 })
2640
2641 return res, err
2642 }
2643
2644
2645
2646
2647
2648
2649 func (s *Stmt) Exec(args ...any) (Result, error) {
2650 return s.ExecContext(context.Background(), args...)
2651 }
2652
2653 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (Result, error) {
2654 ds.Lock()
2655 defer ds.Unlock()
2656
2657 dargs, err := driverArgsConnLocked(ci, ds, args)
2658 if err != nil {
2659 return nil, err
2660 }
2661
2662 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2663 if err != nil {
2664 return nil, err
2665 }
2666 return driverResult{ds.Locker, resi}, nil
2667 }
2668
2669
2670
2671
2672
2673 func (s *Stmt) removeClosedStmtLocked() {
2674 t := len(s.css)/2 + 1
2675 if t > 10 {
2676 t = 10
2677 }
2678 dbClosed := s.db.numClosed.Load()
2679 if dbClosed-s.lastNumClosed < uint64(t) {
2680 return
2681 }
2682
2683 s.db.mu.Lock()
2684 for i := 0; i < len(s.css); i++ {
2685 if s.css[i].dc.dbmuClosed {
2686 s.css[i] = s.css[len(s.css)-1]
2687
2688 s.css[len(s.css)-1] = connStmt{}
2689 s.css = s.css[:len(s.css)-1]
2690 i--
2691 }
2692 }
2693 s.db.mu.Unlock()
2694 s.lastNumClosed = dbClosed
2695 }
2696
2697
2698
2699
2700 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2701 if err = s.stickyErr; err != nil {
2702 return
2703 }
2704 s.mu.Lock()
2705 if s.closed {
2706 s.mu.Unlock()
2707 err = errors.New("sql: statement is closed")
2708 return
2709 }
2710
2711
2712
2713 if s.cg != nil {
2714 s.mu.Unlock()
2715 dc, releaseConn, err = s.cg.grabConn(ctx)
2716 if err != nil {
2717 return
2718 }
2719 return dc, releaseConn, s.cgds, nil
2720 }
2721
2722 s.removeClosedStmtLocked()
2723 s.mu.Unlock()
2724
2725 dc, err = s.db.conn(ctx, strategy)
2726 if err != nil {
2727 return nil, nil, nil, err
2728 }
2729
2730 s.mu.Lock()
2731 for _, v := range s.css {
2732 if v.dc == dc {
2733 s.mu.Unlock()
2734 return dc, dc.releaseConn, v.ds, nil
2735 }
2736 }
2737 s.mu.Unlock()
2738
2739
2740 withLock(dc, func() {
2741 ds, err = s.prepareOnConnLocked(ctx, dc)
2742 })
2743 if err != nil {
2744 dc.releaseConn(err)
2745 return nil, nil, nil, err
2746 }
2747
2748 return dc, dc.releaseConn, ds, nil
2749 }
2750
2751
2752
2753 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2754 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2755 if err != nil {
2756 return nil, err
2757 }
2758 cs := connStmt{dc, si}
2759 s.mu.Lock()
2760 s.css = append(s.css, cs)
2761 s.mu.Unlock()
2762 return cs.ds, nil
2763 }
2764
2765
2766
2767 func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*Rows, error) {
2768 s.closemu.RLock()
2769 defer s.closemu.RUnlock()
2770
2771 var rowsi driver.Rows
2772 var rows *Rows
2773
2774 err := s.db.retry(func(strategy connReuseStrategy) error {
2775 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2776 if err != nil {
2777 return err
2778 }
2779
2780 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2781 if err == nil {
2782
2783
2784 rows = &Rows{
2785 dc: dc,
2786 rowsi: rowsi,
2787
2788 }
2789
2790
2791 s.db.addDep(s, rows)
2792
2793
2794
2795 rows.releaseConn = func(err error) {
2796 releaseConn(err)
2797 s.db.removeDep(s, rows)
2798 }
2799 var txctx context.Context
2800 if s.cg != nil {
2801 txctx = s.cg.txCtx()
2802 }
2803 rows.initContextClose(ctx, txctx)
2804 return nil
2805 }
2806
2807 releaseConn(err)
2808 return err
2809 })
2810
2811 return rows, err
2812 }
2813
2814
2815
2816
2817
2818
2819 func (s *Stmt) Query(args ...any) (*Rows, error) {
2820 return s.QueryContext(context.Background(), args...)
2821 }
2822
2823 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (driver.Rows, error) {
2824 ds.Lock()
2825 defer ds.Unlock()
2826 dargs, err := driverArgsConnLocked(ci, ds, args)
2827 if err != nil {
2828 return nil, err
2829 }
2830 return ctxDriverStmtQuery(ctx, ds.si, dargs)
2831 }
2832
2833
2834
2835
2836
2837
2838
2839 func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *Row {
2840 rows, err := s.QueryContext(ctx, args...)
2841 if err != nil {
2842 return &Row{err: err}
2843 }
2844 return &Row{rows: rows}
2845 }
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861 func (s *Stmt) QueryRow(args ...any) *Row {
2862 return s.QueryRowContext(context.Background(), args...)
2863 }
2864
2865
2866 func (s *Stmt) Close() error {
2867 s.closemu.Lock()
2868 defer s.closemu.Unlock()
2869
2870 if s.stickyErr != nil {
2871 return s.stickyErr
2872 }
2873 s.mu.Lock()
2874 if s.closed {
2875 s.mu.Unlock()
2876 return nil
2877 }
2878 s.closed = true
2879 txds := s.cgds
2880 s.cgds = nil
2881
2882 s.mu.Unlock()
2883
2884 if s.cg == nil {
2885 return s.db.removeDep(s, s)
2886 }
2887
2888 if s.parentStmt != nil {
2889
2890
2891 return s.db.removeDep(s.parentStmt, s)
2892 }
2893 return txds.Close()
2894 }
2895
2896 func (s *Stmt) finalClose() error {
2897 s.mu.Lock()
2898 defer s.mu.Unlock()
2899 if s.css != nil {
2900 for _, v := range s.css {
2901 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2902 v.dc.removeOpenStmt(v.ds)
2903 }
2904 s.css = nil
2905 }
2906 return nil
2907 }
2908
2909
2910
2911 type Rows struct {
2912 dc *driverConn
2913 releaseConn func(error)
2914 rowsi driver.Rows
2915 cancel func()
2916 closeStmt *driverStmt
2917
2918 contextDone atomic.Pointer[error]
2919
2920
2921
2922
2923
2924
2925 closemu sync.RWMutex
2926 closed bool
2927 lasterr error
2928
2929
2930
2931 lastcols []driver.Value
2932
2933
2934
2935
2936
2937
2938
2939
2940 closemuScanHold bool
2941
2942
2943
2944
2945
2946 hitEOF bool
2947 }
2948
2949
2950
2951 func (rs *Rows) lasterrOrErrLocked(err error) error {
2952 if rs.lasterr != nil && rs.lasterr != io.EOF {
2953 return rs.lasterr
2954 }
2955 return err
2956 }
2957
2958
2959
2960 var bypassRowsAwaitDone = false
2961
2962 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2963 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
2964 return
2965 }
2966 if bypassRowsAwaitDone {
2967 return
2968 }
2969 closectx, cancel := context.WithCancel(ctx)
2970 rs.cancel = cancel
2971 go rs.awaitDone(ctx, txctx, closectx)
2972 }
2973
2974
2975
2976
2977
2978
2979 func (rs *Rows) awaitDone(ctx, txctx, closectx context.Context) {
2980 var txctxDone <-chan struct{}
2981 if txctx != nil {
2982 txctxDone = txctx.Done()
2983 }
2984 select {
2985 case <-ctx.Done():
2986 err := ctx.Err()
2987 rs.contextDone.Store(&err)
2988 case <-txctxDone:
2989 err := txctx.Err()
2990 rs.contextDone.Store(&err)
2991 case <-closectx.Done():
2992
2993
2994 }
2995 rs.close(ctx.Err())
2996 }
2997
2998
2999
3000
3001
3002
3003
3004 func (rs *Rows) Next() bool {
3005
3006
3007
3008 rs.closemuRUnlockIfHeldByScan()
3009
3010 if rs.contextDone.Load() != nil {
3011 return false
3012 }
3013
3014 var doClose, ok bool
3015 withLock(rs.closemu.RLocker(), func() {
3016 doClose, ok = rs.nextLocked()
3017 })
3018 if doClose {
3019 rs.Close()
3020 }
3021 if doClose && !ok {
3022 rs.hitEOF = true
3023 }
3024 return ok
3025 }
3026
3027 func (rs *Rows) nextLocked() (doClose, ok bool) {
3028 if rs.closed {
3029 return false, false
3030 }
3031
3032
3033
3034 rs.dc.Lock()
3035 defer rs.dc.Unlock()
3036
3037 if rs.lastcols == nil {
3038 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
3039 }
3040
3041 rs.lasterr = rs.rowsi.Next(rs.lastcols)
3042 if rs.lasterr != nil {
3043
3044 if rs.lasterr != io.EOF {
3045 return true, false
3046 }
3047 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3048 if !ok {
3049 return true, false
3050 }
3051
3052
3053
3054 if !nextResultSet.HasNextResultSet() {
3055 doClose = true
3056 }
3057 return doClose, false
3058 }
3059 return false, true
3060 }
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070 func (rs *Rows) NextResultSet() bool {
3071
3072
3073
3074 rs.closemuRUnlockIfHeldByScan()
3075
3076 var doClose bool
3077 defer func() {
3078 if doClose {
3079 rs.Close()
3080 }
3081 }()
3082 rs.closemu.RLock()
3083 defer rs.closemu.RUnlock()
3084
3085 if rs.closed {
3086 return false
3087 }
3088
3089 rs.lastcols = nil
3090 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3091 if !ok {
3092 doClose = true
3093 return false
3094 }
3095
3096
3097
3098 rs.dc.Lock()
3099 defer rs.dc.Unlock()
3100
3101 rs.lasterr = nextResultSet.NextResultSet()
3102 if rs.lasterr != nil {
3103 doClose = true
3104 return false
3105 }
3106 return true
3107 }
3108
3109
3110
3111 func (rs *Rows) Err() error {
3112
3113
3114
3115
3116 if !rs.hitEOF {
3117 if errp := rs.contextDone.Load(); errp != nil {
3118 return *errp
3119 }
3120 }
3121
3122 rs.closemu.RLock()
3123 defer rs.closemu.RUnlock()
3124 return rs.lasterrOrErrLocked(nil)
3125 }
3126
3127 var errRowsClosed = errors.New("sql: Rows are closed")
3128 var errNoRows = errors.New("sql: no Rows available")
3129
3130
3131
3132 func (rs *Rows) Columns() ([]string, error) {
3133 rs.closemu.RLock()
3134 defer rs.closemu.RUnlock()
3135 if rs.closed {
3136 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3137 }
3138 if rs.rowsi == nil {
3139 return nil, rs.lasterrOrErrLocked(errNoRows)
3140 }
3141 rs.dc.Lock()
3142 defer rs.dc.Unlock()
3143
3144 return rs.rowsi.Columns(), nil
3145 }
3146
3147
3148
3149 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
3150 rs.closemu.RLock()
3151 defer rs.closemu.RUnlock()
3152 if rs.closed {
3153 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3154 }
3155 if rs.rowsi == nil {
3156 return nil, rs.lasterrOrErrLocked(errNoRows)
3157 }
3158 rs.dc.Lock()
3159 defer rs.dc.Unlock()
3160
3161 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
3162 }
3163
3164
3165 type ColumnType struct {
3166 name string
3167
3168 hasNullable bool
3169 hasLength bool
3170 hasPrecisionScale bool
3171
3172 nullable bool
3173 length int64
3174 databaseType string
3175 precision int64
3176 scale int64
3177 scanType reflect.Type
3178 }
3179
3180
3181 func (ci *ColumnType) Name() string {
3182 return ci.name
3183 }
3184
3185
3186
3187
3188
3189
3190 func (ci *ColumnType) Length() (length int64, ok bool) {
3191 return ci.length, ci.hasLength
3192 }
3193
3194
3195
3196 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
3197 return ci.precision, ci.scale, ci.hasPrecisionScale
3198 }
3199
3200
3201
3202
3203 func (ci *ColumnType) ScanType() reflect.Type {
3204 return ci.scanType
3205 }
3206
3207
3208
3209 func (ci *ColumnType) Nullable() (nullable, ok bool) {
3210 return ci.nullable, ci.hasNullable
3211 }
3212
3213
3214
3215
3216
3217
3218
3219 func (ci *ColumnType) DatabaseTypeName() string {
3220 return ci.databaseType
3221 }
3222
3223 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
3224 names := rowsi.Columns()
3225
3226 list := make([]*ColumnType, len(names))
3227 for i := range list {
3228 ci := &ColumnType{
3229 name: names[i],
3230 }
3231 list[i] = ci
3232
3233 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
3234 ci.scanType = prop.ColumnTypeScanType(i)
3235 } else {
3236 ci.scanType = reflect.TypeFor[any]()
3237 }
3238 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
3239 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
3240 }
3241 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
3242 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
3243 }
3244 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
3245 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
3246 }
3247 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
3248 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
3249 }
3250 }
3251 return list
3252 }
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314 func (rs *Rows) Scan(dest ...any) error {
3315 if rs.closemuScanHold {
3316
3317
3318 return fmt.Errorf("sql: Scan called without calling Next (closemuScanHold)")
3319 }
3320 rs.closemu.RLock()
3321
3322 if rs.lasterr != nil && rs.lasterr != io.EOF {
3323 rs.closemu.RUnlock()
3324 return rs.lasterr
3325 }
3326 if rs.closed {
3327 err := rs.lasterrOrErrLocked(errRowsClosed)
3328 rs.closemu.RUnlock()
3329 return err
3330 }
3331
3332 if scanArgsContainRawBytes(dest) {
3333 rs.closemuScanHold = true
3334 } else {
3335 rs.closemu.RUnlock()
3336 }
3337
3338 if rs.lastcols == nil {
3339 rs.closemuRUnlockIfHeldByScan()
3340 return errors.New("sql: Scan called without calling Next")
3341 }
3342 if len(dest) != len(rs.lastcols) {
3343 rs.closemuRUnlockIfHeldByScan()
3344 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
3345 }
3346
3347 for i, sv := range rs.lastcols {
3348 err := convertAssignRows(dest[i], sv, rs)
3349 if err != nil {
3350 rs.closemuRUnlockIfHeldByScan()
3351 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
3352 }
3353 }
3354 return nil
3355 }
3356
3357
3358
3359 func (rs *Rows) closemuRUnlockIfHeldByScan() {
3360 if rs.closemuScanHold {
3361 rs.closemuScanHold = false
3362 rs.closemu.RUnlock()
3363 }
3364 }
3365
3366 func scanArgsContainRawBytes(args []any) bool {
3367 for _, a := range args {
3368 if _, ok := a.(*RawBytes); ok {
3369 return true
3370 }
3371 }
3372 return false
3373 }
3374
3375
3376
3377 var rowsCloseHook = func() func(*Rows, *error) { return nil }
3378
3379
3380
3381
3382
3383 func (rs *Rows) Close() error {
3384
3385
3386
3387 rs.closemuRUnlockIfHeldByScan()
3388
3389 return rs.close(nil)
3390 }
3391
3392 func (rs *Rows) close(err error) error {
3393 rs.closemu.Lock()
3394 defer rs.closemu.Unlock()
3395
3396 if rs.closed {
3397 return nil
3398 }
3399 rs.closed = true
3400
3401 if rs.lasterr == nil {
3402 rs.lasterr = err
3403 }
3404
3405 withLock(rs.dc, func() {
3406 err = rs.rowsi.Close()
3407 })
3408 if fn := rowsCloseHook(); fn != nil {
3409 fn(rs, &err)
3410 }
3411 if rs.cancel != nil {
3412 rs.cancel()
3413 }
3414
3415 if rs.closeStmt != nil {
3416 rs.closeStmt.Close()
3417 }
3418 rs.releaseConn(err)
3419
3420 rs.lasterr = rs.lasterrOrErrLocked(err)
3421 return err
3422 }
3423
3424
3425 type Row struct {
3426
3427 err error
3428 rows *Rows
3429 }
3430
3431
3432
3433
3434
3435
3436 func (r *Row) Scan(dest ...any) error {
3437 if r.err != nil {
3438 return r.err
3439 }
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454 defer r.rows.Close()
3455 for _, dp := range dest {
3456 if _, ok := dp.(*RawBytes); ok {
3457 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
3458 }
3459 }
3460
3461 if !r.rows.Next() {
3462 if err := r.rows.Err(); err != nil {
3463 return err
3464 }
3465 return ErrNoRows
3466 }
3467 err := r.rows.Scan(dest...)
3468 if err != nil {
3469 return err
3470 }
3471
3472 return r.rows.Close()
3473 }
3474
3475
3476
3477
3478
3479 func (r *Row) Err() error {
3480 return r.err
3481 }
3482
3483
3484 type Result interface {
3485
3486
3487
3488
3489
3490 LastInsertId() (int64, error)
3491
3492
3493
3494
3495 RowsAffected() (int64, error)
3496 }
3497
3498 type driverResult struct {
3499 sync.Locker
3500 resi driver.Result
3501 }
3502
3503 func (dr driverResult) LastInsertId() (int64, error) {
3504 dr.Lock()
3505 defer dr.Unlock()
3506 return dr.resi.LastInsertId()
3507 }
3508
3509 func (dr driverResult) RowsAffected() (int64, error) {
3510 dr.Lock()
3511 defer dr.Unlock()
3512 return dr.resi.RowsAffected()
3513 }
3514
3515 func stack() string {
3516 var buf [2 << 10]byte
3517 return string(buf[:runtime.Stack(buf[:], false)])
3518 }
3519
3520
3521 func withLock(lk sync.Locker, fn func()) {
3522 lk.Lock()
3523 defer lk.Unlock()
3524 fn()
3525 }
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537 type connRequestSet struct {
3538
3539 s []connRequestAndIndex
3540 }
3541
3542 type connRequestAndIndex struct {
3543
3544 req chan connRequest
3545
3546
3547
3548 curIdx *int
3549 }
3550
3551
3552
3553 func (s *connRequestSet) CloseAndRemoveAll() {
3554 for _, v := range s.s {
3555 close(v.req)
3556 }
3557 s.s = nil
3558 }
3559
3560
3561 func (s *connRequestSet) Len() int { return len(s.s) }
3562
3563
3564
3565 type connRequestDelHandle struct {
3566 idx *int
3567 }
3568
3569
3570
3571
3572 func (s *connRequestSet) Add(v chan connRequest) connRequestDelHandle {
3573 idx := len(s.s)
3574
3575
3576
3577
3578
3579
3580
3581
3582 idxPtr := &idx
3583 s.s = append(s.s, connRequestAndIndex{v, idxPtr})
3584 return connRequestDelHandle{idxPtr}
3585 }
3586
3587
3588
3589
3590
3591 func (s *connRequestSet) Delete(h connRequestDelHandle) bool {
3592 idx := *h.idx
3593 if idx < 0 {
3594 return false
3595 }
3596 s.deleteIndex(idx)
3597 return true
3598 }
3599
3600 func (s *connRequestSet) deleteIndex(idx int) {
3601
3602 *(s.s[idx].curIdx) = -1
3603
3604
3605 if idx < len(s.s)-1 {
3606 last := s.s[len(s.s)-1]
3607 *last.curIdx = idx
3608 s.s[idx] = last
3609 }
3610
3611 s.s[len(s.s)-1] = connRequestAndIndex{}
3612 s.s = s.s[:len(s.s)-1]
3613 }
3614
3615
3616
3617
3618 func (s *connRequestSet) TakeRandom() (v chan connRequest, ok bool) {
3619 if len(s.s) == 0 {
3620 return nil, false
3621 }
3622 pick := rand.IntN(len(s.s))
3623 e := s.s[pick]
3624 s.deleteIndex(pick)
3625 return e.req, true
3626 }
3627
View as plain text