Source file
src/net/sendfile_test.go
1
2
3
4
5 package net
6
7 import (
8 "bytes"
9 "context"
10 "crypto/sha256"
11 "encoding/hex"
12 "errors"
13 "fmt"
14 "io"
15 "os"
16 "runtime"
17 "strconv"
18 "sync"
19 "testing"
20 "time"
21 )
22
23 const (
24 newton = "../testdata/Isaac.Newton-Opticks.txt"
25 newtonLen = 567198
26 newtonSHA256 = "d4a9ac22462b35e7821a4f2706c211093da678620a8f9997989ee7cf8d507bbd"
27 )
28
29 func TestSendfile(t *testing.T) {
30 ln := newLocalListener(t, "tcp")
31 defer ln.Close()
32
33 errc := make(chan error, 1)
34 go func(ln Listener) {
35
36 conn, err := ln.Accept()
37 if err != nil {
38 errc <- err
39 close(errc)
40 return
41 }
42
43 go func() {
44 defer close(errc)
45 defer conn.Close()
46
47 f, err := os.Open(newton)
48 if err != nil {
49 errc <- err
50 return
51 }
52 defer f.Close()
53
54
55
56 sbytes, err := io.Copy(conn, f)
57 if err != nil {
58 errc <- err
59 return
60 }
61
62 if sbytes != newtonLen {
63 errc <- fmt.Errorf("sent %d bytes; expected %d", sbytes, newtonLen)
64 return
65 }
66 }()
67 }(ln)
68
69
70
71 c, err := Dial("tcp", ln.Addr().String())
72 if err != nil {
73 t.Fatal(err)
74 }
75 defer c.Close()
76
77 h := sha256.New()
78 rbytes, err := io.Copy(h, c)
79 if err != nil {
80 t.Error(err)
81 }
82
83 if rbytes != newtonLen {
84 t.Errorf("received %d bytes; expected %d", rbytes, newtonLen)
85 }
86
87 if res := hex.EncodeToString(h.Sum(nil)); res != newtonSHA256 {
88 t.Error("retrieved data hash did not match")
89 }
90
91 for err := range errc {
92 t.Error(err)
93 }
94 }
95
96 func TestSendfileParts(t *testing.T) {
97 ln := newLocalListener(t, "tcp")
98 defer ln.Close()
99
100 errc := make(chan error, 1)
101 go func(ln Listener) {
102
103 conn, err := ln.Accept()
104 if err != nil {
105 errc <- err
106 close(errc)
107 return
108 }
109
110 go func() {
111 defer close(errc)
112 defer conn.Close()
113
114 f, err := os.Open(newton)
115 if err != nil {
116 errc <- err
117 return
118 }
119 defer f.Close()
120
121 for i := 0; i < 3; i++ {
122
123
124 _, err = io.CopyN(conn, f, 3)
125 if err != nil {
126 errc <- err
127 return
128 }
129 }
130 }()
131 }(ln)
132
133 c, err := Dial("tcp", ln.Addr().String())
134 if err != nil {
135 t.Fatal(err)
136 }
137 defer c.Close()
138
139 buf := new(bytes.Buffer)
140 buf.ReadFrom(c)
141
142 if want, have := "Produced ", buf.String(); have != want {
143 t.Errorf("unexpected server reply %q, want %q", have, want)
144 }
145
146 for err := range errc {
147 t.Error(err)
148 }
149 }
150
151 func TestSendfileSeeked(t *testing.T) {
152 ln := newLocalListener(t, "tcp")
153 defer ln.Close()
154
155 const seekTo = 65 << 10
156 const sendSize = 10 << 10
157
158 errc := make(chan error, 1)
159 go func(ln Listener) {
160
161 conn, err := ln.Accept()
162 if err != nil {
163 errc <- err
164 close(errc)
165 return
166 }
167
168 go func() {
169 defer close(errc)
170 defer conn.Close()
171
172 f, err := os.Open(newton)
173 if err != nil {
174 errc <- err
175 return
176 }
177 defer f.Close()
178 if _, err := f.Seek(seekTo, io.SeekStart); err != nil {
179 errc <- err
180 return
181 }
182
183 _, err = io.CopyN(conn, f, sendSize)
184 if err != nil {
185 errc <- err
186 return
187 }
188 }()
189 }(ln)
190
191 c, err := Dial("tcp", ln.Addr().String())
192 if err != nil {
193 t.Fatal(err)
194 }
195 defer c.Close()
196
197 buf := new(bytes.Buffer)
198 buf.ReadFrom(c)
199
200 if buf.Len() != sendSize {
201 t.Errorf("Got %d bytes; want %d", buf.Len(), sendSize)
202 }
203
204 for err := range errc {
205 t.Error(err)
206 }
207 }
208
209
210 func TestSendfilePipe(t *testing.T) {
211 switch runtime.GOOS {
212 case "plan9", "windows", "js", "wasip1":
213
214 t.Skipf("skipping on %s", runtime.GOOS)
215 }
216
217 t.Parallel()
218
219 ln := newLocalListener(t, "tcp")
220 defer ln.Close()
221
222 r, w, err := os.Pipe()
223 if err != nil {
224 t.Fatal(err)
225 }
226 defer w.Close()
227 defer r.Close()
228
229 copied := make(chan bool)
230
231 var wg sync.WaitGroup
232 wg.Add(1)
233 go func() {
234
235
236 defer wg.Done()
237 conn, err := ln.Accept()
238 if err != nil {
239 t.Error(err)
240 return
241 }
242 defer conn.Close()
243 _, err = io.CopyN(conn, r, 1)
244 if err != nil {
245 t.Error(err)
246 return
247 }
248
249 close(copied)
250 }()
251
252 wg.Add(1)
253 go func() {
254
255 defer wg.Done()
256 _, err := w.Write([]byte{'a'})
257 if err != nil {
258 t.Error(err)
259 }
260 }()
261
262 wg.Add(1)
263 go func() {
264
265
266 defer wg.Done()
267 conn, err := Dial("tcp", ln.Addr().String())
268 if err != nil {
269 t.Error(err)
270 return
271 }
272 defer conn.Close()
273 io.Copy(io.Discard, conn)
274 }()
275
276
277
278 <-copied
279
280
281 if err := r.SetDeadline(time.Now().Add(time.Microsecond)); err != nil {
282 t.Fatal(err)
283 }
284
285 wg.Add(1)
286 go func() {
287
288
289 defer wg.Done()
290 time.Sleep(50 * time.Millisecond)
291 w.Write([]byte{'b'})
292 }()
293
294
295
296 _, err = r.Read(make([]byte, 1))
297 if err == nil {
298 t.Error("Read did not time out")
299 } else if !os.IsTimeout(err) {
300 t.Errorf("got error %v, expected a time out", err)
301 }
302
303 wg.Wait()
304 }
305
306
307 func TestSendfileOnWriteTimeoutExceeded(t *testing.T) {
308 ln := newLocalListener(t, "tcp")
309 defer ln.Close()
310
311 errc := make(chan error, 1)
312 go func(ln Listener) (retErr error) {
313 defer func() {
314 errc <- retErr
315 close(errc)
316 }()
317
318 conn, err := ln.Accept()
319 if err != nil {
320 return err
321 }
322 defer conn.Close()
323
324
325
326 if err := conn.SetWriteDeadline(time.Now().Add(-1 * time.Hour)); err != nil {
327 return err
328 }
329
330 f, err := os.Open(newton)
331 if err != nil {
332 return err
333 }
334 defer f.Close()
335
336 _, err = io.Copy(conn, f)
337 if errors.Is(err, os.ErrDeadlineExceeded) {
338 return nil
339 }
340
341 if err == nil {
342 err = fmt.Errorf("expected ErrDeadlineExceeded, but got nil")
343 }
344 return err
345 }(ln)
346
347 conn, err := Dial("tcp", ln.Addr().String())
348 if err != nil {
349 t.Fatal(err)
350 }
351 defer conn.Close()
352
353 n, err := io.Copy(io.Discard, conn)
354 if err != nil {
355 t.Fatalf("expected nil error, but got %v", err)
356 }
357 if n != 0 {
358 t.Fatalf("expected receive zero, but got %d byte(s)", n)
359 }
360
361 if err := <-errc; err != nil {
362 t.Fatal(err)
363 }
364 }
365
366 func BenchmarkSendfileZeroBytes(b *testing.B) {
367 var (
368 wg sync.WaitGroup
369 ctx, cancel = context.WithCancel(context.Background())
370 )
371
372 defer wg.Wait()
373
374 ln := newLocalListener(b, "tcp")
375 defer ln.Close()
376
377 tempFile, err := os.CreateTemp(b.TempDir(), "test.txt")
378 if err != nil {
379 b.Fatalf("failed to create temp file: %v", err)
380 }
381 defer tempFile.Close()
382
383 fileName := tempFile.Name()
384
385 dataSize := b.N
386 wg.Add(1)
387 go func(f *os.File) {
388 defer wg.Done()
389
390 for i := 0; i < dataSize; i++ {
391 if _, err := f.Write([]byte{1}); err != nil {
392 b.Errorf("failed to write: %v", err)
393 return
394 }
395 if i%1000 == 0 {
396 f.Sync()
397 }
398 }
399 }(tempFile)
400
401 b.ResetTimer()
402 b.ReportAllocs()
403
404 wg.Add(1)
405 go func(ln Listener, fileName string) {
406 defer wg.Done()
407
408 conn, err := ln.Accept()
409 if err != nil {
410 b.Errorf("failed to accept: %v", err)
411 return
412 }
413 defer conn.Close()
414
415 f, err := os.OpenFile(fileName, os.O_RDONLY, 0660)
416 if err != nil {
417 b.Errorf("failed to open file: %v", err)
418 return
419 }
420 defer f.Close()
421
422 for {
423 if ctx.Err() != nil {
424 return
425 }
426
427 if _, err := io.Copy(conn, f); err != nil {
428 b.Errorf("failed to copy: %v", err)
429 return
430 }
431 }
432 }(ln, fileName)
433
434 conn, err := Dial("tcp", ln.Addr().String())
435 if err != nil {
436 b.Fatalf("failed to dial: %v", err)
437 }
438 defer conn.Close()
439
440 n, err := io.CopyN(io.Discard, conn, int64(dataSize))
441 if err != nil {
442 b.Fatalf("failed to copy: %v", err)
443 }
444 if n != int64(dataSize) {
445 b.Fatalf("expected %d copied bytes, but got %d", dataSize, n)
446 }
447
448 cancel()
449 }
450
451 func BenchmarkSendFile(b *testing.B) {
452 if runtime.GOOS == "windows" {
453
454
455 b.Skipf("skipping on %s", runtime.GOOS)
456 }
457
458 b.Run("file-to-tcp", func(b *testing.B) { benchmarkSendFile(b, "tcp") })
459 b.Run("file-to-unix", func(b *testing.B) { benchmarkSendFile(b, "unix") })
460 }
461
462 func benchmarkSendFile(b *testing.B, proto string) {
463 for i := 0; i <= 10; i++ {
464 size := 1 << (i + 10)
465 bench := sendFileBench{
466 proto: proto,
467 chunkSize: size,
468 }
469 b.Run(strconv.Itoa(size), bench.benchSendFile)
470 }
471 }
472
473 type sendFileBench struct {
474 proto string
475 chunkSize int
476 }
477
478 func (bench sendFileBench) benchSendFile(b *testing.B) {
479 fileSize := b.N * bench.chunkSize
480 f := createTempFile(b, fileSize)
481
482 client, server := spawnTestSocketPair(b, bench.proto)
483 defer server.Close()
484
485 cleanUp, err := startTestSocketPeer(b, client, "r", bench.chunkSize, fileSize)
486 if err != nil {
487 client.Close()
488 b.Fatal(err)
489 }
490 defer cleanUp(b)
491
492 b.ReportAllocs()
493 b.SetBytes(int64(bench.chunkSize))
494 b.ResetTimer()
495
496
497 sent, err := io.Copy(server, f)
498 if err != nil {
499 b.Fatalf("failed to copy data with sendfile, error: %v", err)
500 }
501 if sent != int64(fileSize) {
502 b.Fatalf("bytes sent mismatch, got: %d, want: %d", sent, fileSize)
503 }
504 }
505
506 func createTempFile(b *testing.B, size int) *os.File {
507 f, err := os.CreateTemp(b.TempDir(), "sendfile-bench")
508 if err != nil {
509 b.Fatalf("failed to create temporary file: %v", err)
510 }
511 b.Cleanup(func() {
512 f.Close()
513 })
514
515 data := make([]byte, size)
516 if _, err := f.Write(data); err != nil {
517 b.Fatalf("failed to create and feed the file: %v", err)
518 }
519 if err := f.Sync(); err != nil {
520 b.Fatalf("failed to save the file: %v", err)
521 }
522 if _, err := f.Seek(0, io.SeekStart); err != nil {
523 b.Fatalf("failed to rewind the file: %v", err)
524 }
525
526 return f
527 }
528
View as plain text