|
6 | 6 | package net
|
7 | 7 |
|
8 | 8 | import (
|
| 9 | + "bufio" |
9 | 10 | "bytes"
|
10 | 11 | "compress/gzip"
|
11 | 12 | "context"
|
@@ -99,6 +100,65 @@ func mockRateLimitHandler(w http.ResponseWriter, _ *http.Request) {
|
99 | 100 | http.Error(w, "Too Many Requests", HTTPStatusTooManyRequests)
|
100 | 101 | }
|
101 | 102 |
|
| 103 | +// pipeServer acts like an http server with a pipe-based transport |
| 104 | +type pipeServer struct { |
| 105 | + handler http.Handler |
| 106 | + conns chan connPair |
| 107 | + done chan struct{} |
| 108 | +} |
| 109 | + |
| 110 | +type connPair struct { |
| 111 | + client, server net.Conn |
| 112 | +} |
| 113 | + |
| 114 | +func newPipeServer(h http.HandlerFunc) *pipeServer { |
| 115 | + ph := &pipeServer{ |
| 116 | + handler: h, |
| 117 | + conns: make(chan connPair), |
| 118 | + done: make(chan struct{}), |
| 119 | + } |
| 120 | + go ph.serve() |
| 121 | + return ph |
| 122 | +} |
| 123 | + |
| 124 | +func (ph *pipeServer) serve() { |
| 125 | + for { |
| 126 | + select { |
| 127 | + case <-ph.done: |
| 128 | + return |
| 129 | + case pair := <-ph.conns: |
| 130 | + go ph.handleConn(pair.server) |
| 131 | + } |
| 132 | + } |
| 133 | +} |
| 134 | + |
| 135 | +func (ph *pipeServer) handleConn(conn net.Conn) { |
| 136 | + defer conn.Close() |
| 137 | + // Convert the connection into a response writer and request |
| 138 | + req, err := http.ReadRequest(bufio.NewReader(conn)) |
| 139 | + if err != nil { |
| 140 | + return |
| 141 | + } |
| 142 | + w := httptest.NewRecorder() |
| 143 | + ph.handler.ServeHTTP(w, req) |
| 144 | + w.Result().Write(conn) |
| 145 | +} |
| 146 | + |
| 147 | +func (ph *pipeServer) close() { |
| 148 | + close(ph.done) |
| 149 | +} |
| 150 | + |
| 151 | +// Transport returns an http.Transport that dials through pipes to this handler |
| 152 | +func (ph *pipeServer) Transport() *http.Transport { |
| 153 | + return &http.Transport{ |
| 154 | + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { |
| 155 | + client, server := net.Pipe() |
| 156 | + ph.conns <- connPair{client: client, server: server} |
| 157 | + return client, nil |
| 158 | + }, |
| 159 | + } |
| 160 | +} |
| 161 | + |
102 | 162 | // Test Suite
|
103 | 163 |
|
104 | 164 | func TestSendJSONRequest(t *testing.T) {
|
@@ -425,58 +485,22 @@ func TestSendRequestWithCustomHeaders(t *testing.T) {
|
425 | 485 |
|
426 | 486 | func TestSendRequestWithTimeout(t *testing.T) {
|
427 | 487 | synctest.Run(func() {
|
428 |
| - // Create a channel to coordinate between the dialer and server |
429 |
| - type connPair struct { |
430 |
| - client, server net.Conn |
| 488 | + // Create a handler that simulates a slow response |
| 489 | + slowHandler := func(w http.ResponseWriter, r *http.Request) { |
| 490 | + time.Sleep(5 * time.Second) |
| 491 | + w.Header().Set("Content-Type", "application/json") |
| 492 | + w.WriteHeader(http.StatusOK) |
| 493 | + w.Write([]byte("{}")) |
431 | 494 | }
|
432 |
| - newConn := make(chan connPair) |
433 | 495 |
|
434 |
| - // Create a custom transport that creates a new pipe for each request |
435 |
| - transport := &http.Transport{ |
436 |
| - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { |
437 |
| - // Create a new pipe for each connection |
438 |
| - srvConn, cliConn := net.Pipe() |
439 |
| - |
440 |
| - // Send the server connection to be handled |
441 |
| - newConn <- connPair{client: cliConn, server: srvConn} |
442 |
| - |
443 |
| - return cliConn, nil |
444 |
| - }, |
445 |
| - } |
| 496 | + // Create our pipe-based handler |
| 497 | + ph := newPipeServer(slowHandler) |
| 498 | + defer ph.close() |
446 | 499 |
|
447 | 500 | handler := NewRequestHandler()
|
448 |
| - handler.Client.Transport = transport |
| 501 | + handler.Client.Transport = ph.Transport() |
449 | 502 | handler.Client.Timeout = 2 * time.Second
|
450 | 503 |
|
451 |
| - // Channel to signal the simulated server goroutine to stop |
452 |
| - done := make(chan struct{}) |
453 |
| - defer close(done) |
454 |
| - |
455 |
| - // Start a goroutine that simulates all server connections |
456 |
| - go func() { |
457 |
| - for { |
458 |
| - select { |
459 |
| - case <-done: |
460 |
| - return |
461 |
| - case pair := <-newConn: |
462 |
| - // Handle each new connection in its own goroutine |
463 |
| - go func(srvConn net.Conn) { |
464 |
| - defer srvConn.Close() |
465 |
| - |
466 |
| - // Sleep to simulate slow processing |
467 |
| - time.Sleep(5 * time.Second) |
468 |
| - |
469 |
| - w := httptest.NewRecorder() |
470 |
| - w.Header().Set("Content-Type", "application/json") |
471 |
| - w.WriteHeader(http.StatusOK) |
472 |
| - w.Write([]byte("{}")) |
473 |
| - |
474 |
| - w.Result().Write(srvConn) |
475 |
| - }(pair.server) |
476 |
| - } |
477 |
| - } |
478 |
| - }() |
479 |
| - |
480 | 504 | config := RequestConfig{
|
481 | 505 | Method: "GET",
|
482 | 506 | URL: "http://test-server",
|
|
0 commit comments