Introduction

io.Pipe() creates a synchronous in-memory pipe where reads block until writes are available and writes block until reads consume the data. Because reads and writes are blocking, using a pipe in a single goroutine causes a deadlock: the write blocks waiting for a read that will never happen, or the read blocks waiting for a write that is blocked. Pipes require concurrent goroutines -- one to write and one to read -- and both ends must be closed properly to signal end of data and prevent goroutine leaks.

Symptoms

``` fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]: io.(*Pipe).Write(...) /usr/local/go/src/io/pipe.go:86 # Main goroutine blocked on write ```

Or goroutine leak:

bash
# Pipe writer goroutine still running after reader exited
# Memory grows as writes accumulate in pipe buffer

Common Causes

  • Read and write in same goroutine: Single goroutine cannot read and write simultaneously
  • Pipe write end not closed: Reader blocks forever waiting for more data
  • Reader exits without consuming: Writer blocks on next write
  • Error on one end not propagated: Write error not communicated to reader
  • Pipe used in HTTP handler without goroutine: Handler blocks on pipe write
  • io.Copy does not return: Source or destination of copy is the pipe itself

Step-by-Step Fix

Step 1: Use separate goroutines for pipe operations

```go func pipeExample() error { pr, pw := io.Pipe()

// Writer goroutine go func() { defer pw.Close() // Signal EOF to reader

encoder := json.NewEncoder(pw) for i := 0; i < 100; i++ { if err := encoder.Encode(map[string]int{"value": i}); err != nil { pw.CloseWithError(err) // Signal error to reader return } } }()

// Reader (main goroutine) decoder := json.NewDecoder(pr) for { var data map[string]int if err := decoder.Decode(&data); err == io.EOF { break } else if err != nil { return err } fmt.Println(data) }

return nil } ```

Step 2: Stream HTTP response body through pipe

```go func streamToHTTP(w http.ResponseWriter, r *http.Request) { pr, pw := io.Pipe()

// Set content type and start streaming w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK)

// Write data in background go func() { defer pw.Close()

encoder := json.NewEncoder(pw) items, err := fetchAllItems() if err != nil { pw.CloseWithError(err) return }

for _, item := range items { if err := encoder.Encode(item); err != nil { pw.CloseWithError(err) return } } }()

// Copy pipe to HTTP response if _, err := io.Copy(w, pr); err != nil { log.Printf("Stream error: %v", err) } } ```

Step 3: Handle pipe errors correctly

```go type pipeResult struct { err error }

func safePipe() error { pr, pw := io.Pipe() done := make(chan pipeResult, 1)

go func() { var result pipeResult

// Write data _, err := generateLargeData(pw) if err != nil { pw.CloseWithError(err) } else { err = pw.Close() } result.err = err done <- result }()

// Read from pipe _, err := io.Copy(os.Stdout, pr) if err != nil { return err }

// Wait for writer to finish result := <-done return result.err } ```

Prevention

  • Always run the pipe writer in a separate goroutine from the reader
  • Use defer pw.Close() in the writer to signal EOF
  • Use pw.CloseWithError(err) to propagate errors to the reader
  • Never read and write to the same pipe in the same goroutine
  • Use io.Copy to stream pipe contents to destinations
  • Add timeouts with context for long-running pipe operations
  • Test pipe code with race detector to catch concurrent access issues