Source file
src/runtime/netpoll.go
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "internal/runtime/atomic"
11 "runtime/internal/sys"
12 "unsafe"
13 )
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 const (
45 pollNoError = 0
46 pollErrClosing = 1
47 pollErrTimeout = 2
48 pollErrNotPollable = 3
49 )
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 const (
65 pdNil uintptr = 0
66 pdReady uintptr = 1
67 pdWait uintptr = 2
68 )
69
70 const pollBlockSize = 4 * 1024
71
72
73
74
75 type pollDesc struct {
76 _ sys.NotInHeap
77 link *pollDesc
78 fd uintptr
79 fdseq atomic.Uintptr
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 atomicInfo atomic.Uint32
97
98
99
100 rg atomic.Uintptr
101 wg atomic.Uintptr
102
103 lock mutex
104 closing bool
105 rrun bool
106 wrun bool
107 user uint32
108 rseq uintptr
109 rt timer
110 rd int64
111 wseq uintptr
112 wt timer
113 wd int64
114 self *pollDesc
115 }
116
117
118
119
120
121 type pollInfo uint32
122
123 const (
124 pollClosing = 1 << iota
125 pollEventErr
126 pollExpiredReadDeadline
127 pollExpiredWriteDeadline
128 pollFDSeq
129 )
130
131 const (
132 pollFDSeqBits = 20
133 pollFDSeqMask = 1<<pollFDSeqBits - 1
134 )
135
136 func (i pollInfo) closing() bool { return i&pollClosing != 0 }
137 func (i pollInfo) eventErr() bool { return i&pollEventErr != 0 }
138 func (i pollInfo) expiredReadDeadline() bool { return i&pollExpiredReadDeadline != 0 }
139 func (i pollInfo) expiredWriteDeadline() bool { return i&pollExpiredWriteDeadline != 0 }
140
141
142 func (pd *pollDesc) info() pollInfo {
143 return pollInfo(pd.atomicInfo.Load())
144 }
145
146
147
148
149
150
151
152
153 func (pd *pollDesc) publishInfo() {
154 var info uint32
155 if pd.closing {
156 info |= pollClosing
157 }
158 if pd.rd < 0 {
159 info |= pollExpiredReadDeadline
160 }
161 if pd.wd < 0 {
162 info |= pollExpiredWriteDeadline
163 }
164 info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq
165
166
167 x := pd.atomicInfo.Load()
168 for !pd.atomicInfo.CompareAndSwap(x, (x&pollEventErr)|info) {
169 x = pd.atomicInfo.Load()
170 }
171 }
172
173
174
175
176 func (pd *pollDesc) setEventErr(b bool, seq uintptr) {
177 mSeq := uint32(seq & pollFDSeqMask)
178 x := pd.atomicInfo.Load()
179 xSeq := (x >> pollFDSeq) & pollFDSeqMask
180 if seq != 0 && xSeq != mSeq {
181 return
182 }
183 for (x&pollEventErr != 0) != b && !pd.atomicInfo.CompareAndSwap(x, x^pollEventErr) {
184 x = pd.atomicInfo.Load()
185 xSeq := (x >> pollFDSeq) & pollFDSeqMask
186 if seq != 0 && xSeq != mSeq {
187 return
188 }
189 }
190 }
191
192 type pollCache struct {
193 lock mutex
194 first *pollDesc
195
196
197
198
199
200 }
201
202 var (
203 netpollInitLock mutex
204 netpollInited atomic.Uint32
205
206 pollcache pollCache
207 netpollWaiters atomic.Uint32
208 )
209
210
211 func poll_runtime_pollServerInit() {
212 netpollGenericInit()
213 }
214
215 func netpollGenericInit() {
216 if netpollInited.Load() == 0 {
217 lockInit(&netpollInitLock, lockRankNetpollInit)
218 lockInit(&pollcache.lock, lockRankPollCache)
219 lock(&netpollInitLock)
220 if netpollInited.Load() == 0 {
221 netpollinit()
222 netpollInited.Store(1)
223 }
224 unlock(&netpollInitLock)
225 }
226 }
227
228 func netpollinited() bool {
229 return netpollInited.Load() != 0
230 }
231
232
233
234
235
236 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
237 return netpollIsPollDescriptor(fd)
238 }
239
240
241 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
242 pd := pollcache.alloc()
243 lock(&pd.lock)
244 wg := pd.wg.Load()
245 if wg != pdNil && wg != pdReady {
246 throw("runtime: blocked write on free polldesc")
247 }
248 rg := pd.rg.Load()
249 if rg != pdNil && rg != pdReady {
250 throw("runtime: blocked read on free polldesc")
251 }
252 pd.fd = fd
253 if pd.fdseq.Load() == 0 {
254
255 pd.fdseq.Store(1)
256 }
257 pd.closing = false
258 pd.setEventErr(false, 0)
259 pd.rseq++
260 pd.rg.Store(pdNil)
261 pd.rd = 0
262 pd.wseq++
263 pd.wg.Store(pdNil)
264 pd.wd = 0
265 pd.self = pd
266 pd.publishInfo()
267 unlock(&pd.lock)
268
269 errno := netpollopen(fd, pd)
270 if errno != 0 {
271 pollcache.free(pd)
272 return nil, int(errno)
273 }
274 return pd, 0
275 }
276
277
278 func poll_runtime_pollClose(pd *pollDesc) {
279 if !pd.closing {
280 throw("runtime: close polldesc w/o unblock")
281 }
282 wg := pd.wg.Load()
283 if wg != pdNil && wg != pdReady {
284 throw("runtime: blocked write on closing polldesc")
285 }
286 rg := pd.rg.Load()
287 if rg != pdNil && rg != pdReady {
288 throw("runtime: blocked read on closing polldesc")
289 }
290 netpollclose(pd.fd)
291 pollcache.free(pd)
292 }
293
294 func (c *pollCache) free(pd *pollDesc) {
295
296
297 lock(&pd.lock)
298
299
300
301 fdseq := pd.fdseq.Load()
302 fdseq = (fdseq + 1) & (1<<taggedPointerBits - 1)
303 pd.fdseq.Store(fdseq)
304
305 pd.publishInfo()
306
307 unlock(&pd.lock)
308
309 lock(&c.lock)
310 pd.link = c.first
311 c.first = pd
312 unlock(&c.lock)
313 }
314
315
316
317
318
319
320 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
321 errcode := netpollcheckerr(pd, int32(mode))
322 if errcode != pollNoError {
323 return errcode
324 }
325 if mode == 'r' {
326 pd.rg.Store(pdNil)
327 } else if mode == 'w' {
328 pd.wg.Store(pdNil)
329 }
330 return pollNoError
331 }
332
333
334
335
336
337
338
339 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
340 errcode := netpollcheckerr(pd, int32(mode))
341 if errcode != pollNoError {
342 return errcode
343 }
344
345 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
346 netpollarm(pd, mode)
347 }
348 for !netpollblock(pd, int32(mode), false) {
349 errcode = netpollcheckerr(pd, int32(mode))
350 if errcode != pollNoError {
351 return errcode
352 }
353
354
355
356 }
357 return pollNoError
358 }
359
360
361 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
362
363
364 for !netpollblock(pd, int32(mode), true) {
365 }
366 }
367
368
369 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
370 lock(&pd.lock)
371 if pd.closing {
372 unlock(&pd.lock)
373 return
374 }
375 rd0, wd0 := pd.rd, pd.wd
376 combo0 := rd0 > 0 && rd0 == wd0
377 if d > 0 {
378 d += nanotime()
379 if d <= 0 {
380
381
382 d = 1<<63 - 1
383 }
384 }
385 if mode == 'r' || mode == 'r'+'w' {
386 pd.rd = d
387 }
388 if mode == 'w' || mode == 'r'+'w' {
389 pd.wd = d
390 }
391 pd.publishInfo()
392 combo := pd.rd > 0 && pd.rd == pd.wd
393 rtf := netpollReadDeadline
394 if combo {
395 rtf = netpollDeadline
396 }
397 if !pd.rrun {
398 if pd.rd > 0 {
399
400
401
402 pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
403 pd.rrun = true
404 }
405 } else if pd.rd != rd0 || combo != combo0 {
406 pd.rseq++
407 if pd.rd > 0 {
408 pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
409 } else {
410 pd.rt.stop()
411 pd.rrun = false
412 }
413 }
414 if !pd.wrun {
415 if pd.wd > 0 && !combo {
416 pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
417 pd.wrun = true
418 }
419 } else if pd.wd != wd0 || combo != combo0 {
420 pd.wseq++
421 if pd.wd > 0 && !combo {
422 pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
423 } else {
424 pd.wt.stop()
425 pd.wrun = false
426 }
427 }
428
429
430 delta := int32(0)
431 var rg, wg *g
432 if pd.rd < 0 {
433 rg = netpollunblock(pd, 'r', false, &delta)
434 }
435 if pd.wd < 0 {
436 wg = netpollunblock(pd, 'w', false, &delta)
437 }
438 unlock(&pd.lock)
439 if rg != nil {
440 netpollgoready(rg, 3)
441 }
442 if wg != nil {
443 netpollgoready(wg, 3)
444 }
445 netpollAdjustWaiters(delta)
446 }
447
448
449 func poll_runtime_pollUnblock(pd *pollDesc) {
450 lock(&pd.lock)
451 if pd.closing {
452 throw("runtime: unblock on closing polldesc")
453 }
454 pd.closing = true
455 pd.rseq++
456 pd.wseq++
457 var rg, wg *g
458 pd.publishInfo()
459 delta := int32(0)
460 rg = netpollunblock(pd, 'r', false, &delta)
461 wg = netpollunblock(pd, 'w', false, &delta)
462 if pd.rrun {
463 pd.rt.stop()
464 pd.rrun = false
465 }
466 if pd.wrun {
467 pd.wt.stop()
468 pd.wrun = false
469 }
470 unlock(&pd.lock)
471 if rg != nil {
472 netpollgoready(rg, 3)
473 }
474 if wg != nil {
475 netpollgoready(wg, 3)
476 }
477 netpollAdjustWaiters(delta)
478 }
479
480
481
482
483
484
485
486
487
488
489
490
491 func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
492 delta := int32(0)
493 var rg, wg *g
494 if mode == 'r' || mode == 'r'+'w' {
495 rg = netpollunblock(pd, 'r', true, &delta)
496 }
497 if mode == 'w' || mode == 'r'+'w' {
498 wg = netpollunblock(pd, 'w', true, &delta)
499 }
500 if rg != nil {
501 toRun.push(rg)
502 }
503 if wg != nil {
504 toRun.push(wg)
505 }
506 return delta
507 }
508
509 func netpollcheckerr(pd *pollDesc, mode int32) int {
510 info := pd.info()
511 if info.closing() {
512 return pollErrClosing
513 }
514 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
515 return pollErrTimeout
516 }
517
518
519
520 if mode == 'r' && info.eventErr() {
521 return pollErrNotPollable
522 }
523 return pollNoError
524 }
525
526 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
527 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
528 if r {
529
530
531
532 netpollAdjustWaiters(1)
533 }
534 return r
535 }
536
537 func netpollgoready(gp *g, traceskip int) {
538 goready(gp, traceskip+1)
539 }
540
541
542
543
544
545 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
546 gpp := &pd.rg
547 if mode == 'w' {
548 gpp = &pd.wg
549 }
550
551
552 for {
553
554 if gpp.CompareAndSwap(pdReady, pdNil) {
555 return true
556 }
557 if gpp.CompareAndSwap(pdNil, pdWait) {
558 break
559 }
560
561
562
563 if v := gpp.Load(); v != pdReady && v != pdNil {
564 throw("runtime: double wait")
565 }
566 }
567
568
569
570
571 if waitio || netpollcheckerr(pd, mode) == pollNoError {
572 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
573 }
574
575 old := gpp.Swap(pdNil)
576 if old > pdWait {
577 throw("runtime: corrupted polldesc")
578 }
579 return old == pdReady
580 }
581
582
583
584
585
586
587
588 func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
589 gpp := &pd.rg
590 if mode == 'w' {
591 gpp = &pd.wg
592 }
593
594 for {
595 old := gpp.Load()
596 if old == pdReady {
597 return nil
598 }
599 if old == pdNil && !ioready {
600
601
602 return nil
603 }
604 new := pdNil
605 if ioready {
606 new = pdReady
607 }
608 if gpp.CompareAndSwap(old, new) {
609 if old == pdWait {
610 old = pdNil
611 } else if old != pdNil {
612 *delta -= 1
613 }
614 return (*g)(unsafe.Pointer(old))
615 }
616 }
617 }
618
619 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
620 lock(&pd.lock)
621
622
623 currentSeq := pd.rseq
624 if !read {
625 currentSeq = pd.wseq
626 }
627 if seq != currentSeq {
628
629 unlock(&pd.lock)
630 return
631 }
632 delta := int32(0)
633 var rg *g
634 if read {
635 if pd.rd <= 0 || !pd.rrun {
636 throw("runtime: inconsistent read deadline")
637 }
638 pd.rd = -1
639 pd.publishInfo()
640 rg = netpollunblock(pd, 'r', false, &delta)
641 }
642 var wg *g
643 if write {
644 if pd.wd <= 0 || !pd.wrun && !read {
645 throw("runtime: inconsistent write deadline")
646 }
647 pd.wd = -1
648 pd.publishInfo()
649 wg = netpollunblock(pd, 'w', false, &delta)
650 }
651 unlock(&pd.lock)
652 if rg != nil {
653 netpollgoready(rg, 0)
654 }
655 if wg != nil {
656 netpollgoready(wg, 0)
657 }
658 netpollAdjustWaiters(delta)
659 }
660
661 func netpollDeadline(arg any, seq uintptr, delta int64) {
662 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
663 }
664
665 func netpollReadDeadline(arg any, seq uintptr, delta int64) {
666 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
667 }
668
669 func netpollWriteDeadline(arg any, seq uintptr, delta int64) {
670 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
671 }
672
673
674 func netpollAnyWaiters() bool {
675 return netpollWaiters.Load() > 0
676 }
677
678
679 func netpollAdjustWaiters(delta int32) {
680 if delta != 0 {
681 netpollWaiters.Add(delta)
682 }
683 }
684
685 func (c *pollCache) alloc() *pollDesc {
686 lock(&c.lock)
687 if c.first == nil {
688 const pdSize = unsafe.Sizeof(pollDesc{})
689 n := pollBlockSize / pdSize
690 if n == 0 {
691 n = 1
692 }
693
694
695 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
696 for i := uintptr(0); i < n; i++ {
697 pd := (*pollDesc)(add(mem, i*pdSize))
698 lockInit(&pd.lock, lockRankPollDesc)
699 pd.rt.init(nil, nil)
700 pd.wt.init(nil, nil)
701 pd.link = c.first
702 c.first = pd
703 }
704 }
705 pd := c.first
706 c.first = pd.link
707 unlock(&c.lock)
708 return pd
709 }
710
711
712
713
714
715
716 func (pd *pollDesc) makeArg() (i any) {
717 x := (*eface)(unsafe.Pointer(&i))
718 x._type = pdType
719 x.data = unsafe.Pointer(&pd.self)
720 return
721 }
722
723 var (
724 pdEface any = (*pollDesc)(nil)
725 pdType *_type = efaceOf(&pdEface)._type
726 )
727
View as plain text