Skip to content

Commit

Permalink
server (ticdc): add timeout for statusServer. (#5332)
Browse files Browse the repository at this point in the history
close #5303
  • Loading branch information
asddongmen committed May 6, 2022
1 parent 6f91a84 commit 063b528
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
5 changes: 4 additions & 1 deletion cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,17 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
return
}

// c does not have a cancel() func and its Done() method always return nil,
// so we should not use c as a context.
// Ref:https://github.com/gin-gonic/gin/blob/92eeaa4ebbadec2376e2ca5f5749888da1a42e24/context.go#L1157
ctx := c.Request.Context()
var changefeedConfig model.ChangefeedConfig
if err := c.BindJSON(&changefeedConfig); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err))
return
}

info, err := verifyCreateChangefeedConfig(c, changefeedConfig, h.capture)
info, err := verifyCreateChangefeedConfig(ctx, changefeedConfig, h.capture)
if err != nil {
_ = c.Error(err)
return
Expand Down
20 changes: 18 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand All @@ -53,6 +54,10 @@ const (
defaultDataDir = "/tmp/cdc_data"
// dataDirThreshold is used to warn if the free space of the specified data-dir is lower than it, unit is GB
dataDirThreshold = 500
// maxHTTPConnection is used to limits the max concurrent connections of http server.
maxHTTPConnection = 1000
// httpConnectionTimeout is used to limits a connection max alive time of http server.
httpConnectionTimeout = 10 * time.Minute
)

// Server is the capture server
Expand Down Expand Up @@ -166,8 +171,14 @@ func (s *Server) Run(ctx context.Context) error {

// startStatusHTTP starts the HTTP server.
// `lis` is a listener that gives us plain-text HTTP requests.
// TODO can we decouple the HTTP server from the capture server?
// TODO: can we decouple the HTTP server from the capture server?
func (s *Server) startStatusHTTP(lis net.Listener) error {
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener. Connections that exceed the
// limit will wait in a queue and no new goroutines will be created until
// a connection is processed.
// We use it here to limit the max concurrent conections of statusServer.
lis = netutil.LimitListener(lis, maxHTTPConnection)
conf := config.GetGlobalServerConfig()

// discard gin log output
Expand All @@ -177,7 +188,12 @@ func (s *Server) startStatusHTTP(lis net.Listener) error {
RegisterRoutes(router, s.capture, registry)

// No need to configure TLS because it is already handled by `s.tcpServer`.
s.statusServer = &http.Server{Handler: router}
// Add ReadTimeout and WriteTimeout to avoid some abnormal connections never close.
s.statusServer = &http.Server{
Handler: router,
ReadTimeout: httpConnectionTimeout,
WriteTimeout: httpConnectionTimeout,
}

go func() {
log.Info("http server is running", zap.String("addr", conf.Addr))
Expand Down
8 changes: 8 additions & 0 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,14 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

// we must close adminClient when this func return cause by an error
// otherwise the adminClient will never be closed and lead to an goroutine leak
defer func() {
if err != nil {
adminClient.Close()
}
}()

if err := kafka.AdjustConfig(adminClient, baseConfig, saramaConfig, topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
Expand Down

0 comments on commit 063b528

Please sign in to comment.