Skip to content

Commit

Permalink
Improve documentation
Browse files Browse the repository at this point in the history
* Add various JSDocs
* Add `partition` config to KafkaReader
* Change `KafkaGoSaur` constructor to take enums
  • Loading branch information
arjun-1 committed Feb 7, 2022
1 parent f3830af commit 0c42137
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 22 deletions.
26 changes: 17 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# kafkagosaur

[![deno module](https://shield.deno.dev/x/kafkagosaur)](https://deno.land/x/kafkagosaur)
[![deno doc](https://doc.deno.land/badge.svg)](https://doc.deno.land/https/deno.land/x/kafkagosaur/mod.ts)
[![CI](https://github.com/arjun-1/kafkagosaur/actions/workflows/CI.yaml/badge.svg)](https://github.com/arjun-1/kafkagosaur/actions/workflows/CI.yaml)
[![codecov](https://codecov.io/gh/arjun-1/kafkagosaur/branch/master/graph/badge.svg)](https://codecov.io/gh/arjun-1/kafkagosaur)

Expand Down Expand Up @@ -88,7 +89,7 @@ API of kafka-go closely.

## Development

To build the WebAssemnbly module, first run
To build the WebAssembly module, first run

```bash
make build-wasm
Expand All @@ -108,10 +109,13 @@ make test
```

## Performance benchmarks

The Deno benchmarks are located in [bench](bench) and can be run via

```bash
deno run --allow-read --allow-net --allow-env --unstable bench/reader.ts
```

```bash
deno run --allow-read --allow-net --allow-env --unstable bench/writer.ts
```
Expand All @@ -120,18 +124,22 @@ deno run --allow-read --allow-net --allow-env --unstable bench/writer.ts

<img width="423" alt="bench" src="https://user-images.githubusercontent.com/8102654/152859490-81464138-56ff-43d2-92db-7727458a561b.png">

| | kafkagosaur | kafka-go[^2]
|-------------------|-------------|--------------
| writeMessages[^1] | 3977 ± 244 | 4678 ± 207
| readMessage | 1963 ± 190 | 2784 ± 374
| | kafkagosaur[^3] | kafka-go[^2] |
| ----------------- | --------------- | ------------ |
| writeMessages[^1] | 3977 ± 244 | 4678 ± 207 |
| readMessage | 1963 ± 190 | 2784 ± 374 |

[^1]: Batching 10.000 messages.

[^1]: When batching 10.000 messages.
[^2]: When using a single goroutine.
[^2]: Using a single goroutine.

[^3]: Using the `DialBackend.Node`.

#### Environment
- 2,6 GHz 6-Core Intel Core i7
- [Confluent Cloud Basic cluster](https://docs.confluent.io/cloud/current/clusters/cluster-types.html#basic-clusters); 6 partitions

- 2,6 GHz 6-Core Intel Core i7
- [Confluent Cloud Basic cluster](https://docs.confluent.io/cloud/current/clusters/cluster-types.html#basic-clusters);
6 partitions

## Contributing

Expand Down
2 changes: 1 addition & 1 deletion bench/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { SASLMechanism } from "../security/sasl.ts";
import { bench, runBenchmarks } from "./deps.ts";
import { broker, password, topic, username } from "./config.ts";

const nrOfMessages = 100000;
const nrOfMessages = 10000;

const readerConfig = {
brokers: [broker],
Expand Down
Binary file modified bin/kafkagosaur.wasm
Binary file not shown.
45 changes: 42 additions & 3 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
// @deno-types="./global.d.ts"
import "./lib/wasm_exec.js";
import { deadline, DeadlineError, delay } from "./deps.ts";
import { Dial, setDialOnGlobal } from "./net/connection.ts";
import { DialBackend, setDialOnGlobal } from "./net/connection.ts";
import { dial as nodeDial } from "./net/node-connection.ts";
import { dial as denoDial } from "./net/deno-connection.ts";
import { KafkaDialer, KafkaDialerConfig } from "./dialer.ts";
import { KafkaReader, KafkaReaderConfig } from "./reader.ts";
import { KafkaWriter, KafkaWriterConfig } from "./writer.ts";

/**
* A Kafka client for Deno built using WebAssembly.
*
* ### Example reader
*
* ```typescript
* const kafkaGoSaur = new KafkaGoSaur();
* const reader = await kafkaGoSaur.reader({
* brokers: ["localhost:9092"],
* topic: "test-0",
* });
*
* const readMsg = await reader.readMessage();
* ```
*
* ### Example writer
*
* ```typescript
* const kafkaGoSaur = new KafkaGoSaur();
* const writer = await kafkaGoSaur.writer({
* broker: "localhost:9092",
* topic: "test-0",
* });
*
* const enc = new TextEncoder();
* const msgs = [{ value: enc.encode("value") }];
*
* await writer.writeMessages(msgs);
* ```
*/

const runGoWasm = async (wasmFilePath: string): Promise<void> => {
const go = new global.Go();
const wasmBytes = await Deno.readFile(wasmFilePath);
Expand Down Expand Up @@ -41,8 +73,15 @@ const untilGloballyDefined = (
};

class KafkaGoSaur {
constructor(dial: Dial = nodeDial) {
setDialOnGlobal(dial);
constructor(dialBackend: DialBackend = DialBackend.Node) {
switch (dialBackend) {
case DialBackend.Node:
setDialOnGlobal(nodeDial);
break;
case DialBackend.Deno:
setDialOnGlobal(denoDial);
break;
}
runGoWasm("./bin/kafkagosaur.wasm");
}

Expand Down
21 changes: 12 additions & 9 deletions net/connection.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
/** Represents a network endpoint address, equivalent to a Go net.addr. */
export type Address = {
/**
* Name of the network (for example, "tcp", "udp").
*/
/** Name of the network (for example, "tcp", "udp"). */
network: string;
/**
* String form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80").
*/
/** String form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80"). */
string: string;
};

/**
* A generic stream-oriented network connection, equivalent to a GO net.Conn.
*/
/** A generic stream-oriented network connection, equivalent to a Go net.Conn. */
export interface Connection {
/** Returns the local network address. */
readonly localAddr: Address;
/** Returns the remote network address. */
readonly remoteAddr: Address;

/**
Expand Down Expand Up @@ -42,3 +39,9 @@ export type Dial = (hostname: string, port: number) => Promise<Connection>;
export const setDialOnGlobal = (dial: Dial) => {
(globalThis as Record<string, unknown>).dial = dial;
};

/** Specifies the implementation backing a TCP socket connection */
export enum DialBackend {
Node,
Deno,
}
35 changes: 35 additions & 0 deletions reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { Header } from "./header.ts";
import { SASLConfig } from "./security/sasl.ts";
import { TLSConfig } from "./security/tls.ts";

/** A data structure representing kafka messages read from `KafkaReader`. */
export type KafkaReadMessage = {
/** Topic indicates which topic this message was consumed from via `KafkaReader`. */
topic: string;
partition: number;
offset: number;
Expand All @@ -13,19 +15,52 @@ export type KafkaReadMessage = {
time: number;
};

/** A configuration object used to create new instances of `KafkaReader`. */
export type KafkaReaderConfig = {
/** The list of broker addresses used to connect to the kafka cluster. */
brokers: string[];
/** The topic to read messages from. */
topic: string;
/**
* Holds the optional consumer group id. If `groupId` is specified, then
* `partition` should NOT be specified.
*/
groupId?: string;
/**
* Partition to read messages from. Either `partition` or `groupId` may
* be assigned, but not both.
*/
partition?: number;
sasl?: SASLConfig;
tls?: TLSConfig;
};

export interface KafkaReader {
/** Closes the stream, preventing the program from reading any more messages from it. */
close: () => Promise<void>;
/** Commits the list of messages passed as argument. */
commitMessages: (msgs: KafkaReadMessage[]) => Promise<void>;
/** Reads and return the next message. Does not commit offsets automatically
* when using consumer groups. Use `commitMessages` to commit the offset.
*/
fetchMessage: () => Promise<KafkaReadMessage>;
/**
* Reads and return the next message. If consumer groups are used, `readMessage`
* will automatically commit the offset when called. Note that this could result
* in an offset being committed before the message is fully processed.
*
* If more fine grained control of when offsets are committed is required, it
* is recommended to use `fetchMessage` with `commitMessages` instead.
*/
readMessage: () => Promise<KafkaReadMessage>;
/**
* SetOffset changes the offset from which the next batch of messages will be
* read.
*/
setOffset: (offset: number) => Promise<void>;
/**
* SetOffset changes the offset from which the next batch of messages will be
* read given the timestamp.
*/
setOffsetAt: (timeMs: number) => Promise<void>;
}
1 change: 1 addition & 0 deletions security/sasl.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum SASLMechanism {
/** Passes the credentials in clear text. */
PLAIN = "PLAIN",
SCRAMSHA512 = "SCRAM-SHA-512",
SCRAMSHA256 = "SCRAM-SHA-256",
Expand Down
14 changes: 14 additions & 0 deletions security/tls.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
/**
* X509KeyPair holds a public/private key of PEM encoded data.
*/
export type X509KeyPair = {
key: string;
cert: string;
};

export type TLSConfig = boolean | {
/**
* Controls whether a client verifies the server's certificate chain and
* host name. If set true, crypto/tls accepts any certificate presented
* by the server and any host name in that certificate. In this mode, TLS
* is susceptible to machine-in-the-middle attacks unless custom verification
* is used. This should be used only for testing.
*/
insecureSkipVerify?: boolean;
keyPair?: X509KeyPair;
/**
* Defines the set of root certificate authority that clients use when
* verifying server certificates. If not set, use the host's root CA set.
*/
ca?: string;
};
4 changes: 4 additions & 0 deletions src/kafkagosaur/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ var NewReaderJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{}
kafkaReaderConfig.GroupID = groupId.String()
}

if partition := readerConfigJs.Get("partition"); !partition.IsUndefined() {
kafkaReaderConfig.Partition = partition.Int()
}

if topic := readerConfigJs.Get("topic"); !topic.IsUndefined() {
kafkaReaderConfig.Topic = topic.String()
}
Expand Down
18 changes: 18 additions & 0 deletions writer.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
import { Header } from "./header.ts";
import { SASLConfig } from "./security/sasl.ts";
import { TLSConfig } from "./security/tls.ts";

/** A data structure representing kafka messages written to `KafkaWriter`. */
export type KafkaWriteMessage = {
/**
* Can be used to configured the topic if not already specified on
* `KafkWriter` itself.
*/
topic?: string;
offset?: number;
highWaterMark?: number;
key?: Uint8Array;
value?: Uint8Array;
headers?: Header[];
/** If not set, will be automatically set when writing the message. */
time?: number;
};

/** A configuration object used to create new instances of `KafkaWriter`. */
export type KafkaWriterConfig = {
/**
* Name of the topic that `KafkaWriter` will produce messages to.
* Setting this field or not is a mutually exclusive option. If you set `topic`
* here, you must not set `topic` for any `KafkaWriteMessage`. Otherwise, if you do
* not set `topic`, every `KafkaWriteMessage` must have `topic` specified.
*/
topic?: string;
/** Address of the kafka cluster that this writer is configured to send messages to. */
address: string;
/** Maximum amount of time that connections will remain open and unused. */
idleTimeout?: number;
sasl?: SASLConfig;
tls?: TLSConfig;
};

export interface KafkaWriter {
/** Writes a batch of messages to the configured kafka topic. */
writeMessages: (msgs: KafkaWriteMessage[]) => Promise<void>;
/** Close flushes pending writes, and waits for all writes to complete before returning. */
close: () => Promise<void>;
}

0 comments on commit 0c42137

Please sign in to comment.