Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to write to streams buffer under bidi-stream #368

Closed
sneakstarberry opened this issue Aug 16, 2023 · 4 comments · Fixed by #536
Closed

Unable to write to streams buffer under bidi-stream #368

sneakstarberry opened this issue Aug 16, 2023 · 4 comments · Fixed by #536
Assignees
Labels
bug Something isn't working

Comments

@sneakstarberry
Copy link

Describe the bug
While doing benchmark test zilla got error.
test was bidi-stream 15 connection and each connection send 100 messages.
for this test I'm using ghz benchmark cmd tool.
my ghz configuration is under description.

{
    "insecure": true,
    "skipTLS": true,
    "proto": "./test.proto",
    "call": "test.TestService/SendTestData",
    "stream-interval": "5ms",
    "total": 15,
    "concurrency": 15,
    "data-file": "./messages.json",
    "metadata": {
        "trace_id": "{{.RequestNumber}}",
        "timestamp": "{{.TimestampUnix}}"
    },
    "host": "localhost:8082"
}

Total request is 15 and concurrency request is 15, each request client send 100 messages. thus, 1500 messages should be sended.
Zilla configuration file is almost same as grpc-kafka echo example. proto file is not that important (i guess). it's just like EchoBidiStream.

name: gRPC-TEST
bindings:

# Gateway ingress config
  tcp_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port: 8082
    exit: http_server
  http_server:
    type: http
    kind: server
    routes:
      - when:
          - headers:
              :scheme: http
              :authority: localhost:8082
        exit: grpc_server

# gRPC service definition
  grpc_server:
    type: grpc
    kind: server
    options:
      services:
        - proto/test.proto
    routes:
      - when:
          - method: test.TestService/*
        exit: grpc_kafka

# Proxy a gRPC service to a Kafka topic
  grpc_kafka:
    type: grpc-kafka
    kind: proxy
    options:
      correlation:
        headers:
          service: zilla:service
          method: zilla:method
          correlation-id: zilla:correlation-id
          reply-to: zilla:reply-to
    routes:
      - when:
          - method: test.TestService/*
        exit: kafka_cache_client
        with:
          capability: produce
          topic: echo-messages
          acks: leader_only
          reply-to: echo-messages

# Kafka caching layer
  kafka_cache_client:
    type: kafka
    kind: cache_client
    options:
      bootstrap:
        - echo-messages
    exit: kafka_cache_server
  kafka_cache_server:
    type: kafka
    kind: cache_server
    exit: kafka_client

# Connect to local Kafka
  kafka_client:
    type: kafka
    kind: client
    exit: kafka_tcp_client
  kafka_tcp_client:
    type: tcp
    kind: client
    options:
      host: kafka
      port: 9092
    routes:
      - when:
          - cidr: 0.0.0.0/0

To Reproduce
For reproducing this error you can follow under description.

  1. make your own grpc-kafka echo server configuration or just use above configuration. (you can replace .proto file another echo proto file)
  2. install ghz cmd tool.
  3. use above the ghz config file (config.json) but you should make messages.json for stream data. (or you can use python code that i wrote in bottom of this issue)
  4. command ghz --config config.json
  5. then you can see the error (Unable to write to streams buffer or org.agrona.concurrent.AgentTerminationException: java.lang.IndexOutOfBoundsException: index=65519 length=179 capacity=65536)

Expected behavior
Expected behavior is even if It's slow process the request correctly.

Desktop (please complete the following information):

  • OS: macbook pro 2019, intel
  • docker desktop, resources are cpu: 8, memory: 12GB, engine: v20.10.21

Additional context
Add any other context about the problem here.

  1. Unable to write to streams buffer
2023-08-17 00:23:50 org.agrona.concurrent.AgentTerminationException: java.lang.IllegalStateException: Unable to write to streams buffer
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:681)
2023-08-17 00:23:50     at org.agrona.core/org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:291)
2023-08-17 00:23:50     at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
2023-08-17 00:23:50     at java.base/java.lang.Thread.run(Thread.java:833)
2023-08-17 00:23:50 Caused by: java.lang.IllegalStateException: Unable to write to streams buffer
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.stream.Target.handleWrite(Target.java:159)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.binding.grpc.kafka@0.9.51/io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream.GrpcKafkaProxyFactory.doData(GrpcKafkaProxyFactory.java:2140)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.binding.grpc.kafka@0.9.51/io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream.GrpcKafkaProxyFactory$KafkaProduceProxy.doKafkaData(GrpcKafkaProxyFactory.java:1538)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.binding.grpc.kafka@0.9.51/io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream.GrpcKafkaProxyFactory$GrpcProduceProxy.onGrpcData(GrpcKafkaProxyFactory.java:1172)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.binding.grpc.kafka@0.9.51/io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream.GrpcKafkaProxyFactory$GrpcProduceProxy.onGrpcMessage(GrpcKafkaProxyFactory.java:1086)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.binding.grpc.kafka@0.9.51/io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream.GrpcKafkaProxyFactory.lambda$newStream$1(GrpcKafkaProxyFactory.java:228)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:1038)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:1005)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
2023-08-17 00:23:50     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:675)
2023-08-17 00:23:50     ... 3 more
2023-08-17 00:23:50     Suppressed: java.lang.Exception: [engine/data#4]        [0x0404000000000029] streams=[consumeAt=0x0004c350 (0x000000000014c350), produceAt=0x000fff18 (0x00000000001fff18)]
2023-08-17 00:23:50             at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:679)
2023-08-17 00:23:50             ... 3 more
  1. IndexOutOfBoundsException
2023-08-17 00:23:33 org.agrona.concurrent.AgentTerminationException: java.lang.IndexOutOfBoundsException: index=65519 length=179 capacity=65536
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:681)
2023-08-17 00:23:33     at org.agrona.core/org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:291)
2023-08-17 00:23:33     at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
2023-08-17 00:23:33     at java.base/java.lang.Thread.run(Thread.java:833)
2023-08-17 00:23:33 Caused by: java.lang.IndexOutOfBoundsException: index=65519 length=179 capacity=65536
2023-08-17 00:23:33     at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.boundsCheck0(UnsafeBuffer.java:1692)
2023-08-17 00:23:33     at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.putBytes(UnsafeBuffer.java:945)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory$KafkaProduceStream$KafkaProduceClient.doEncodeRecordFin(KafkaClientProduceFactory.java:1735)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory.flushRecordContFin(KafkaClientProduceFactory.java:576)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory$KafkaProduceStream$KafkaProduceClient.flush(KafkaClientProduceFactory.java:1581)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory$KafkaProduceStream.onApplicationData(KafkaClientProduceFactory.java:979)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory$KafkaProduceStream.onApplication(KafkaClientProduceFactory.java:903)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.binding.kafka@0.9.51/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaClientProduceFactory.lambda$newStream$1(KafkaClientProduceFactory.java:257)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:1038)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:1005)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
2023-08-17 00:23:33     at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:675)
2023-08-17 00:23:33     ... 3 more
2023-08-17 00:23:33     Suppressed: java.lang.Exception: [engine/data#7]        [0x070700000000000b] streams=[consumeAt=0x0001f5f0 (0x000000000021f5f0), produceAt=0x00081080 (0x0000000000281080)]
2023-08-17 00:23:33             at io.aklivity.zilla.runtime.engine@0.9.51/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:679)
2023-08-17 00:23:33             ... 3 more
  1. proto file
syntax = "proto3";
package test;

message TestRequest{
  message TestOneRequest {
    int64 id = 1;
  }

  message TestTwoRequest {
    string message = 1;
  }

  oneof message {
    TestOneRequest one = 1;
    TestTwoRequest two = 2;
  }
}

service TestService {
  rpc SendTestData(stream TestRequest) returns (stream TestRequest);
}

  1. this python code is for create stream data.
import json

message_list = []

for i in range(1, 101):
    message = {
        "one": {
            "id": i
        }
    }
    message_list.append(message)


json_messages = json.dumps(message_list, indent=4)

with open('messages.json', 'w') as file:
    file.write(json_messages)
@jfallows
Copy link
Contributor

@sneakstarberry thank you, we'll look into this.

@jfallows
Copy link
Contributor

@sneakstarberry

We have incorporated ghz benchmark tool into the README for grpc.kafka.echo example in zilla-examples.

Please update your zilla-examples fork, and follow benchmark instructions at https://github.com/aklivity/zilla-examples/tree/main/grpc.kafka.echo#bench.

Then please update a branch on your fork with local modifications to demonstrate the reproducible test case and send us the link to the branch.

Many thanks in advance.

@jfallows jfallows self-assigned this Aug 18, 2023
@jfallows jfallows added the bug Something isn't working label Aug 18, 2023
@sneakstarberry
Copy link
Author

sneakstarberry commented Aug 18, 2023

Hello! i'm reproducing test case. and make fork and create branch. error reproduce instruction is bottom of README.md.
https://github.com/sneakstarberry/zilla-examples/tree/bug/ghz-benchmark-fail/grpc.kafka.echo

is this all you need? or should I give you more information about errors?

@sneakstarberry
Copy link
Author

modifying zilla.properties can hadle more requests but eventually failed.

zilla.engine.budgets.buffer.capacity=16777216
#zilla.engine.load.buffer.capacity=16777216
zilla.engine.streams.buffer.capacity=16777216
zilla.engine.command.buffer.capacity=16777216
zilla.engine.response.buffer.capacity=16777216
zilla.engine.counters.buffer.capacity=16777216
zilla.engine.buffer.slot.capacity=1048576
zilla.engine.routes.buffer.capacity=16777216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment