Introduction
io.Pipe() creates a synchronous in-memory pipe where reads block until writes are available and writes block until reads consume the data. Because reads and writes are blocking, using a pipe in a single goroutine causes a deadlock: the write blocks waiting for a read that will never happen, or the read blocks waiting for a write that is blocked. Pipes require concurrent goroutines -- one to write and one to read -- and both ends must be closed properly to signal end of data and prevent goroutine leaks.
Symptoms
``` fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]: io.(*Pipe).Write(...) /usr/local/go/src/io/pipe.go:86 # Main goroutine blocked on write ```
Or goroutine leak:
# Pipe writer goroutine still running after reader exited
# Memory grows as writes accumulate in pipe bufferCommon Causes
- Read and write in same goroutine: Single goroutine cannot read and write simultaneously
- Pipe write end not closed: Reader blocks forever waiting for more data
- Reader exits without consuming: Writer blocks on next write
- Error on one end not propagated: Write error not communicated to reader
- Pipe used in HTTP handler without goroutine: Handler blocks on pipe write
- io.Copy does not return: Source or destination of copy is the pipe itself
Step-by-Step Fix
Step 1: Use separate goroutines for pipe operations
```go func pipeExample() error { pr, pw := io.Pipe()
// Writer goroutine go func() { defer pw.Close() // Signal EOF to reader
encoder := json.NewEncoder(pw) for i := 0; i < 100; i++ { if err := encoder.Encode(map[string]int{"value": i}); err != nil { pw.CloseWithError(err) // Signal error to reader return } } }()
// Reader (main goroutine) decoder := json.NewDecoder(pr) for { var data map[string]int if err := decoder.Decode(&data); err == io.EOF { break } else if err != nil { return err } fmt.Println(data) }
return nil } ```
Step 2: Stream HTTP response body through pipe
```go func streamToHTTP(w http.ResponseWriter, r *http.Request) { pr, pw := io.Pipe()
// Set content type and start streaming w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK)
// Write data in background go func() { defer pw.Close()
encoder := json.NewEncoder(pw) items, err := fetchAllItems() if err != nil { pw.CloseWithError(err) return }
for _, item := range items { if err := encoder.Encode(item); err != nil { pw.CloseWithError(err) return } } }()
// Copy pipe to HTTP response if _, err := io.Copy(w, pr); err != nil { log.Printf("Stream error: %v", err) } } ```
Step 3: Handle pipe errors correctly
```go type pipeResult struct { err error }
func safePipe() error { pr, pw := io.Pipe() done := make(chan pipeResult, 1)
go func() { var result pipeResult
// Write data _, err := generateLargeData(pw) if err != nil { pw.CloseWithError(err) } else { err = pw.Close() } result.err = err done <- result }()
// Read from pipe _, err := io.Copy(os.Stdout, pr) if err != nil { return err }
// Wait for writer to finish result := <-done return result.err } ```
Prevention
- Always run the pipe writer in a separate goroutine from the reader
- Use
defer pw.Close()in the writer to signal EOF - Use
pw.CloseWithError(err)to propagate errors to the reader - Never read and write to the same pipe in the same goroutine
- Use
io.Copyto stream pipe contents to destinations - Add timeouts with context for long-running pipe operations
- Test pipe code with race detector to catch concurrent access issues