fiber: πŸ› Streaming with SetBodyStreamWriter results in deadlock

Fiber version 2.6.0 Issue description Using SetBodyStreamWriter leads to a deadlock: only 5 items are being generated, but the client which makes the call does not receive anything; trying to gracefully shutdown the application does not work either - it waits indefinitely for the connections to be closed; ReadTimeout does not seem to have any effect. I included the full code of a small demo application - once running just visit http://localhost:8080/v1/streaming from a browser / tool and you will be able to see the behavior described.

Code snippet

package main

import (
	"bufio"
	"bytes"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/cors"
	"github.com/segmentio/encoding/json"
)

var app *fiber.App

func main() {

	app = fiber.New(fiber.Config{
		Prefork:          false,
		DisableKeepalive: true,
		JSONEncoder:      json.Marshal,
		ReadTimeout:      time.Second * 20,
	})
	app.Use(cors.New())

	// send runtime data to chart
	app.Get("/v1/streaming", StreamingHandler)

	// Listen from a different goroutine
	go func() {
		if err := app.Listen(":8080"); err != nil {
			log.Panic(err)
		}
	}()

	c := make(chan os.Signal, 1)                    // Create channel to signify a signal being sent
	signal.Notify(c, os.Interrupt, syscall.SIGTERM) // When an interrupt or termination signal is sent, notify the channel

	_ = <-c // This blocks the main thread until an interrupt is received
	fmt.Println("Gracefully shutting down...")
	_ = app.Shutdown()

	fmt.Println("Running cleanup tasks...")

	// Your cleanup tasks go here
	// db.Close()
	// redisConn.Close()
}

type dataPoint struct {
	X string
	Y int64
}

// StreamingHandler ...
func StreamingHandler(c *fiber.Ctx) error {
	// Test using: curl -iv http://localhost:8080/v1/streaming
	q := make(chan error, 1)
	c.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
		ticker := time.NewTicker(1500 * time.Millisecond)
		defer ticker.Stop()

		for range ticker.C {
			// Create data
			resp := dataPoint{
				X: "aa",
				Y: 12546,
			}

			// Convert to json
			b, _ := json.Marshal(resp)
			cb := new(bytes.Buffer)
			json.Compact(cb, b)
			b = cb.Bytes()
			log.Println(string(append(b, "\n"...)))

			// Send data
			_, err := w.Write(append(b, "\n"...))
			if err != nil {
				q <- err
				return
			}

			err = w.Flush()
			if err != nil {
				q <- err
				return
			}
		}
	})
	return <-q
}

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (7 by maintainers)

Most upvoted comments

That did it! Thank you - I really have to brush up my documentation reading !

You should return nil instead of return <-q,otherwise it will block. Why is it always 5 items? For streamWriter fasthttp will create PipeConns like this:

func NewPipeConns() *PipeConns {
	ch1 := make(chan *byteBuffer, 4)
	ch2 := make(chan *byteBuffer, 4)
        ....
}