Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using atomic instead of mutex and delete scratch slice #1833

Merged
merged 13 commits into from
Aug 25, 2024

Conversation

NikoMalik
Copy link
Contributor

No description provided.

workerpool.go Outdated Show resolved Hide resolved
@newacorn
Copy link
Contributor

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

@NikoMalik
Copy link
Contributor Author

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

I got a boost in benchmark tests by almost a factor and a half

workerpool.go Outdated Show resolved Hide resolved
@erikdubbelboer
Copy link
Collaborator

Can you show which benchmarks and their results here?

@NikoMalik
Copy link
Contributor Author

func BenchmarkWorkerPoolStartStopSerial(b *testing.B) {
	for i := 0; i < b.N; i++ {
		testWorkerPoolStartStopBENCH()
	}
}

func BenchmarkWorkerPoolStartStopConcurrent(b *testing.B) {
	concurrency := 10
	ch := make(chan struct{}, concurrency)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < concurrency; j++ {
			go func() {
				testWorkerPoolStartStopBENCH()
				ch <- struct{}{}
			}()
		}
		for j := 0; j < concurrency; j++ {
			select {
			case <-ch:
			case <-time.After(time.Second):
				b.Fatalf("timeout")
			}
		}
	}
}

func BenchmarkWorkerPoolMaxWorkersCountSerial(b *testing.B) {
	for i := 0; i < b.N; i++ {
		testWorkerPoolMaxWorkersCountMultiBENCH(b)
	}
}

func BenchmarkWorkerPoolMaxWorkersCountConcurrent(b *testing.B) {
	concurrency := 4
	ch := make(chan struct{}, concurrency)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < concurrency; j++ {
			go func() {
				testWorkerPoolMaxWorkersCountMultiBENCH(b)
				ch <- struct{}{}
			}()
		}
		for j := 0; j < concurrency; j++ {
			select {
			case <-ch:
			case <-time.After(time.Second * 2):
				b.Fatalf("timeout")
			}
		}
	}
}

func testWorkerPoolStartStopBENCH() {
	wp := &workerPool{
		WorkerFunc:      func(conn net.Conn) error { return nil },
		MaxWorkersCount: 10,
		Logger:          defaultLogger,
	}
	for i := 0; i < 10; i++ {
		wp.Start()
		wp.Stop()
	}
}

func testWorkerPoolMaxWorkersCountMultiBENCH(b *testing.B) {
	for i := 0; i < 5; i++ {
		testWorkerPoolMaxWorkersCountBENCH(b)
	}
}

func testWorkerPoolMaxWorkersCountBENCH(b *testing.B) {
	ready := make(chan struct{})
	wp := &workerPool{
		WorkerFunc: func(conn net.Conn) error {
			buf := make([]byte, 100)
			n, err := conn.Read(buf)
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			buf = buf[:n]
			if string(buf) != "foobar" {
				b.Errorf("unexpected data read: %q. Expecting %q", buf, "foobar")
			}
			if _, err = conn.Write([]byte("baz")); err != nil {
				b.Errorf("unexpected error: %v", err)
			}

			<-ready

			return nil
		},
		MaxWorkersCount: 10,
		Logger:          defaultLogger,
		connState:       func(net.Conn, ConnState) {},
	}
	wp.Start()

	ln := fasthttputil.NewInmemoryListener()

	clientCh := make(chan struct{}, wp.MaxWorkersCount)
	for i := 0; i < wp.MaxWorkersCount; i++ {
		go func() {
			conn, err := ln.Dial()
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			if _, err = conn.Write([]byte("foobar")); err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			data, err := io.ReadAll(conn)
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			if string(data) != "baz" {
				b.Errorf("unexpected value read: %q. Expecting %q", data, "baz")
			}
			if err = conn.Close(); err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			clientCh <- struct{}{}
		}()
	}

	for i := 0; i < wp.MaxWorkersCount; i++ {
		conn, err := ln.Accept()
		if err != nil {
			b.Fatalf("unexpected error: %v", err)
		}
		if !wp.Serve(conn) {
			b.Fatalf("worker pool must have enough workers to serve the conn")
		}
	}

	go func() {
		if _, err := ln.Dial(); err != nil {
			b.Errorf("unexpected error: %v", err)
		}
	}()
	conn, err := ln.Accept()
	if err != nil {
		b.Fatalf("unexpected error: %v", err)
	}
	for i := 0; i < 5; i++ {
		if wp.Serve(conn) {
			b.Fatalf("worker pool must be full")
		}
	}
	if err = conn.Close(); err != nil {
		b.Fatalf("unexpected error: %v", err)
	}

	close(ready)

	for i := 0; i < wp.MaxWorkersCount; i++ {
		select {
		case <-clientCh:
		case <-time.After(time.Second):
			b.Fatalf("timeout")
		}
	}

	if err := ln.Close(); err != nil {
		b.Fatalf("unexpected error: %v", err)
	}
	wp.Stop()
}

I used this benchmark based on tests
resultsUsingAtomic:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  216567              5454 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35728             31353 ns/op           17913 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2488            523983 ns/op          253846 B/op       1143 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1339            811728 ns/op         1019013 B/op       4624 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.554s

resultsUsingMutexAndSlice:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  211056              5594 ns/op            1508 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35869             32778 ns/op           18003 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2433            537527 ns/op          256182 B/op       1093 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1458            876898 ns/op         1027666 B/op       4420 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.788s

@erikdubbelboer
Copy link
Collaborator

Any idea what is causing the extra allocations?

This option immediately exits the loop when the maximum number of vorkers is reached, rather than creating a new vorker if the limit is reached. This reduces the frequency of unnecessary operations and potential locks in sync.Pool
@NikoMalik
Copy link
Contributor Author

NikoMalik commented Aug 22, 2024

Any idea what is causing the extra allocations?

i fixed

cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  205198              5466 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   34980             30404 ns/op           17959 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2520            509416 ns/op          251338 B/op       1050 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1652            782699 ns/op         1008180 B/op       4218 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.588s 

problem was in getch()

Copy link
Collaborator

@erikdubbelboer erikdubbelboer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our linter would like to see some changes.

workerpool.go Show resolved Hide resolved
workerpool.go Outdated Show resolved Hide resolved
workerpool.go Outdated Show resolved Hide resolved
NikoMalik and others added 4 commits August 24, 2024 10:26
Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
Copy link
Collaborator

@erikdubbelboer erikdubbelboer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the code isn't completely thread safe, 3 tests failed with the race detector.

@NikoMalik
Copy link
Contributor Author

Seems like the code isn't completely thread safe, 3 tests failed with the race detector.

I may have ruled out the last possible data races

@erikdubbelboer erikdubbelboer merged commit 19c50cd into valyala:master Aug 25, 2024
15 checks passed
@erikdubbelboer
Copy link
Collaborator

Thanks!

newacorn added a commit to newacorn/fasthttp that referenced this pull request Aug 26, 2024
newacorn added a commit to newacorn/fasthttp that referenced this pull request Sep 7, 2024
erikdubbelboer pushed a commit that referenced this pull request Sep 10, 2024
* Remove a redundant field and clarify the comments.

* Revert "Using atomic instead of mutex and delete scratch slice (#1833)"

This reverts commit 19c50cd.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants