Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout stream response connection does not clear buffer data for re-use #1743

Closed
TAYTS opened this issue Mar 31, 2024 · 3 comments · Fixed by #1757
Closed

Timeout stream response connection does not clear buffer data for re-use #1743

TAYTS opened this issue Mar 31, 2024 · 3 comments · Fixed by #1757

Comments

@TAYTS
Copy link

TAYTS commented Mar 31, 2024

I am using fasthttp as a proxy server which will call downstream service that return stream response, however when downstream request timeout before fully returned the stream response body will cause the next request fail to parse the response header due the reason that the clientconn is being re-used and there are some data from previous request.

In the default transport RoundTrip(), when we only use hc.releaseConn(cc) in the resp.bodyStream instead of closing the conn. For stream response, is it possible to close the tcp connection if it is timeout to prevent the dirty data from previous request? Maybe we could change the newCloseReader's close function to check if the read is timeout or others issue to better cleanup the connection?

resp.bodyStream = newCloseReader(rbs, func() error {
	hc.releaseReader(br)
	if r, ok := rbs.(*requestStream); ok {
		releaseRequestStream(r)
	}
	if closeConn || resp.ConnectionClose() {
		hc.closeConn(cc)
	} else {
		hc.releaseConn(cc)
	}
	return nil
})

How to re-produce:

func main() {
	var lbc = fasthttp.LBClient{
		Clients: make([]fasthttp.BalancingClient, 0),
		Timeout: 1 * time.Second,
	}

	downStreamAdd := "127.0.0.1:3000"
	lbc.Clients = append(lbc.Clients, &fasthttp.HostClient{
		Addr:               downStreamAdd,
		StreamResponseBody: true,
                 RetryIf: func(request *fasthttp.Request) bool {
                     return false
                 },
	})

	server := fasthttp.Server{
		Handler: func(ctx *fasthttp.RequestCtx) {
			err := lbc.Do(&ctx.Request, &ctx.Response)
			fmt.Println(err)
		},
	}

	if err := server.ListenAndServe("localhost:8080"); err != nil {
		fmt.Println("fail to start server", err.Error())
	}
}
  1. Create stream response service that return data more 1 second
  2. Make another request after timeout
  3. Receive error message failed to read response haeder
fail to make request: error when reading response headers: cannot find whitespace in the first line of response "2\r\n9\n\r\n3\r\n10\n\r\n3\r\n11\n\r\n3\r\n12\n\r\n3\r\n13\n\r\n3\r\n14\n\r\n3\r\n15\n\r\n3\r\n16\n\r\n3\r\n17\n\r\n3\r\n18\n\r\n3\r\n19\n\r\n3\r\n20\n\r\n3\r\n21\n\r\n3\r\n22\n\r\n3\r\n23\n\r\n3\r\n24\n\r\n3\r\n25\n\r\n3\r\n26\n\r\n3\r\n27\n\r\n3\r\n28\n\r\n0\r\n\r\n". Buffer size=164, contents: "2\r\n9\n\r\n3\r\n10\n\r\n3\r\n11\n\r\n3\r\n12\n\r\n3\r\n13\n\r\n3\r\n14\n\r\n3\r\n15\n\r\n3\r\n16\n\r\n3\r\n17\n\r\n3\r\n18\n\r\n3\r\n19\n\r\n3\r\n20\n\r\n3\r\n21\n\r\n3\r\n22\n\r\n3\r\n23\n\r\n3\r\n24\n\r\n3\r\n25\n\r\n3\r\n26\n\r\n3\r\n27\n\r\n3\r\n28\n\r\n0\r\n\r\n"

I tried a dirty hack to always close the clientconn when cleaning up the response.bodyStream but not sure if there is a better way of doing it and not really sure if this is a bug or I misuse it?

@TAYTS TAYTS closed this as completed Mar 31, 2024
@TAYTS TAYTS reopened this Mar 31, 2024
@TAYTS
Copy link
Author

TAYTS commented Apr 1, 2024

Propose solution:

resp.bodyStream = newCloseReader(rbs, func() error {
	hc.releaseReader(br)
	if r, ok := rbs.(*requestStream); ok {
		releaseRequestStream(r)
	}
	if closeConn || resp.ConnectionClose() {
		hc.closeConn(cc)
	} else {
		hc.releaseConn(cc)
	}
        // if the connection last use time is greater than deadline, close the connection
        if cc.lastUseTime.After(deadline) {
               hc.closeConn(cc)
        }
	return nil
})

@TAYTS TAYTS changed the title Timeout stream response re-use timeout connection Timeout stream response connection being used on next request Apr 3, 2024
@TAYTS TAYTS changed the title Timeout stream response connection being used on next request Timeout stream response connection does not clear buffer data for re-use Apr 3, 2024
@mdenushev
Copy link
Contributor

mdenushev commented Apr 11, 2024

Same, example python upstream:

from gevent import monkey; monkey.patch_all()
import gevent

from bottle import route, run

i = 0

@route('/stream')
def stream():
    global i
    i += 1
    yield str(i) + 'part1' + '.' * 30000
    gevent.sleep(60)
    yield str(i) + 'part2' + '.' * 30000
    gevent.sleep(60)
    yield str(i) + 'part3' + '.' * 30000

run(host='0.0.0.0', port=8080, server='gevent')

Looks like when we re-use clientConn after timeout while writing response body stream, next request on same conn can read bytes from first response body.
Example with additional logs:

got conn 127.0.0.1:58268
buffered: 0
<nil>
write body stream before close err: read tcp4 127.0.0.1:58268->127.0.0.1:3002: i/o timeout
2024-04-11 16:45:20.005816 +0300 MSK m=+3.055472251 released conn
2024/04/11 16:45:20 error when serving connection "127.0.0.1:9001"<->"127.0.0.1:58267": read tcp4 127.0.0.1:58268->127.0.0.1:3002: i/o timeout
got conn 127.0.0.1:58268
got buffer starting with buffered count 0
got buffer after reset with buffered count 0
buffered: 0
error when reading response headers: cannot find whitespace in the first line of response "7536\r\n1part1.................

mdenushev pushed a commit to mdenushev/fasthttp that referenced this issue Apr 12, 2024
erikdubbelboer pushed a commit that referenced this issue Apr 22, 2024
* fix: propagate body stream error to close function (#1743)

* fix: http test

* fix: close body stream with error in encoding functions

* fix: lint

---------

Co-authored-by: Max Denushev <denushev@tochka.com>
erikdubbelboer pushed a commit that referenced this issue May 2, 2024
* fix: propagate body stream error to close function (#1743)

* feat: add address in ErrDialTimeout

* feat: add address in any `tryDial` error

* feat: use struct to wrap error with upstream info

* fix: lint

* fix: wrapped Error() method

* docs: add example to ErrDialWithUpstream

* feat: add address in ErrDialTimeout

* feat: add address in any `tryDial` error

* feat: use struct to wrap error with upstream info

* fix: lint

* fix: wrapped Error() method

* docs: add example to ErrDialWithUpstream

* docs: fix example for ErrDialWithUpstream

---------

Co-authored-by: Max Denushev <denushev@tochka.com>
@makasim
Copy link

makasim commented Sep 24, 2024

I was able to catch the same error on fasthttp v1.55.0. It is kind of api gateway where the server is std http server and the upstream client is fasthttp (I am trying to migrate on it). I am trying to repo the issue but without luck so far. Any hints or ideas on how to repo would be much appreciated.

For starters, I am posting snippets of the code where I got the error. I hope it helps.

new client:

func NewFastClient() *fasthttp.Client {
	return &fasthttp.Client{
		NoDefaultUserAgentHeader: true,
		MaxConnsPerHost:          50000,
		MaxIdleConnDuration:      time.Second * 50,
		ReadTimeout:              time.Second * 50,
		WriteTimeout:             time.Second * 50,
		StreamResponseBody:       true,
	}
}

serve (stripped version):

fReq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(fReq)

fReq.Header.DisableSpecialHeader()
fReq.SetRequestURI(targetURL)
fReq.SetBody(ctx.RequestBody)
fReq.Header.SetMethod(method)

// set X-Forwarded-Host to public host name
fReq.Header.Set("X-Forwarded-Host", publicHost)
fReq.Header.Set("Connection", "keep-alive")
fReq.Header.Set("Host", upstreamHost)
fReq.Header.Add("Via", requestProto+" api-gateway")

fResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(fResp)

fResp.StreamBody = true

if err := h.httpClient.DoTimeout(fReq, fResp, time.Second*50); isTimeout(err) {
  http.Error(downResp, `gateway timeout`, http.StatusGatewayTimeout)
  return
} else if err != nil {
  l.Error("http client: do failed", logger.Error(err), "err_code", errCode)
  
  //Here I caught the error:
  // error when reading response headers: cannot find whitespace in the first line of response
  http.Error(downResp, `bad gateway`, http.StatusBadGateway)
  return
}
defer func() {
  if err := fResp.CloseBodyStream(); err != nil {
	l.Error("failed to close response body", logger.Error(err))
  }
}()

fResp.Header.VisitAll(func(key, value []byte) {
  downResp.Header().Add(string(key), string(value))
})
downResp.WriteHeader(fResp.StatusCode())

if stream := resp.BodyStream(); stream != nil {
  if _, err := io.Copy(downResp.Body, resp.BodyStream()); err != nil {
    return fmt.Errorf("write response: %w", err)
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants