From 00aa82dcb43da998f338eff0a640e4b1e1a2ce6e Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Sun, 5 May 2024 13:29:18 +0200 Subject: [PATCH 1/9] chore: use node-prefix --- src/common.ts | 2 +- src/index.ts | 21 +++++++++++---------- src/worker.ts | 5 +++-- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/common.ts b/src/common.ts index f232d065..4ee58695 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,4 +1,4 @@ -import type { MessagePort } from 'worker_threads'; +import type { MessagePort } from 'node:worker_threads'; export const READY = '_WORKER_READY'; diff --git a/src/index.ts b/src/index.ts index cc919410..03d43b9c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,14 @@ -import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads'; -import { once, EventEmitterAsyncResource } from 'events'; -import { AsyncResource } from 'async_hooks'; -import { availableParallelism } from 'os'; -import { fileURLToPath, URL } from 'url'; -import { resolve } from 'path'; -import { inspect, types } from 'util'; -import assert from 'assert'; -import { Histogram, RecordableHistogram, createHistogram, performance } from 'perf_hooks'; +import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'node:worker_threads'; +import { once, EventEmitterAsyncResource } from 'node:events'; +import { AsyncResource } from 'node:async_hooks'; +import { availableParallelism } from 'node:os'; +import { fileURLToPath, URL } from 'node:url'; +import { resolve } from 'node:path'; +import { inspect, types } from 'node:util'; +import { Histogram, RecordableHistogram, createHistogram, performance } from 'node:perf_hooks'; +import { setTimeout as sleep } from 'node:timers/promises'; +import assert from 'node:assert'; + import { READY, RequestMessage, @@ -29,7 +31,6 @@ import { } from './common'; import FixedQueue from './fixed-queue'; import { version } from '../package.json'; -import { setTimeout as sleep } from 'timers/promises'; const cpuParallelism : number = availableParallelism(); diff --git a/src/worker.ts b/src/worker.ts index 1cb7d1dc..18c76d11 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,5 +1,6 @@ -import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads'; -import { pathToFileURL } from 'url'; +import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'node:worker_threads'; +import { pathToFileURL } from 'node:url'; + import { READY, commonState, From ca58e07d001b0e286b4c3a37b44c0aafb08f018c Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Sun, 5 May 2024 21:53:05 +0200 Subject: [PATCH 2/9] chore: consolidate common --- src/common.ts | 139 ++++++++++++++------------- src/index.ts | 255 +++++++++++++++----------------------------------- src/worker.ts | 14 ++- 3 files changed, 153 insertions(+), 255 deletions(-) diff --git a/src/common.ts b/src/common.ts index 4ee58695..6f56e508 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,59 +1,44 @@ -import type { MessagePort } from 'node:worker_threads'; +import type { Histogram } from 'node:perf_hooks'; +import { fileURLToPath, URL } from 'node:url'; -export const READY = '_WORKER_READY'; - -export interface StartupMessage { - filename : string | null; - name : string; - port : MessagePort; - sharedBuffer : Int32Array; - useAtomics : boolean; - niceIncrement : number; -} +import type { + PiscinaMovable, + HistogramSummary +} from './types'; +import { kMovable, kTransferable, kValue } from './symbols'; -export interface RequestMessage { - taskId : number; - task : any; - filename: string; - name : string; -} - -export interface ReadyMessage { - [READY]: true -}; - -export interface ResponseMessage { - taskId : number; - result : any; - error: Error | null; -} -export const commonState = { - isWorkerThread: false, - workerData: undefined -}; - -// Internal symbol used to mark Transferable objects returned -// by the Piscina.move() function -const kMovable = Symbol('Piscina.kMovable'); -export const kTransferable = Symbol.for('Piscina.transferable'); -export const kValue = Symbol.for('Piscina.valueOf'); -export const kQueueOptions = Symbol.for('Piscina.queueOptions'); +// States wether the worker is ready to receive tasks +export const READY = '_WORKER_READY'; -// True if the object implements the Transferable interface -export function isTransferable (value : any) : boolean { - return value != null && - typeof value === 'object' && - kTransferable in value && - kValue in value; +/** + * True if the object implements the Transferable interface + * + * @export + * @param {unknown} value + * @return {*} {boolean} + */ +export function isTransferable (value: unknown): boolean { + return ( + value != null && + typeof value === 'object' && + kTransferable in value && + kValue in value + ); } -// True if object implements Transferable and has been returned -// by the Piscina.move() function -export function isMovable (value : any) : boolean { +/** + * True if object implements Transferable and has been returned + * by the Piscina.move() function + * + * @export + * @param {(unknown & PiscinaMovable)} value + * @return {*} {boolean} + */ +export function isMovable (value: unknown & PiscinaMovable): boolean { return isTransferable(value) && value[kMovable] === true; } -export function markMovable (value : object) : void { +export function markMovable (value: {}): void { Object.defineProperty(value, kMovable, { enumerable: false, configurable: true, @@ -62,31 +47,45 @@ export function markMovable (value : object) : void { }); } -export interface Transferable { - readonly [kTransferable] : object; - readonly [kValue] : object; -} +// State of Piscina pool +export const commonState = { + isWorkerThread: false, + workerData: undefined +}; -export interface Task { - readonly [kQueueOptions] : object | null; -} +export function createHistogramSummary (histogram: Histogram): HistogramSummary { + const { mean, stddev, min, max } = histogram; -export interface TaskQueue { - readonly size : number; - shift () : Task | null; - remove (task : Task) : void; - push (task : Task) : void; + return { + average: mean / 1000, + mean: mean / 1000, + stddev, + min: min / 1000, + max: max / 1000, + p0_001: histogram.percentile(0.001) / 1000, + p0_01: histogram.percentile(0.01) / 1000, + p0_1: histogram.percentile(0.1) / 1000, + p1: histogram.percentile(1) / 1000, + p2_5: histogram.percentile(2.5) / 1000, + p10: histogram.percentile(10) / 1000, + p25: histogram.percentile(25) / 1000, + p50: histogram.percentile(50) / 1000, + p75: histogram.percentile(75) / 1000, + p90: histogram.percentile(90) / 1000, + p97_5: histogram.percentile(97.5) / 1000, + p99: histogram.percentile(99) / 1000, + p99_9: histogram.percentile(99.9) / 1000, + p99_99: histogram.percentile(99.99) / 1000, + p99_999: histogram.percentile(99.999) / 1000 + }; } -export function isTaskQueue (value : any) : boolean { - return typeof value === 'object' && - value !== null && - 'size' in value && - typeof value.shift === 'function' && - typeof value.remove === 'function' && - typeof value.push === 'function'; +export function toHistogramIntegerNano (milliseconds: number): number { + return Math.max(1, Math.trunc(milliseconds * 1000)); } -export const kRequestCountField = 0; -export const kResponseCountField = 1; -export const kFieldCount = 2; +export function maybeFileURLToPath (filename : string) : string { + return filename.startsWith('file:') + ? fileURLToPath(new URL(filename)) + : filename; +} diff --git a/src/index.ts b/src/index.ts index 03d43b9c..9d985ac4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,158 +2,66 @@ import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'node: import { once, EventEmitterAsyncResource } from 'node:events'; import { AsyncResource } from 'node:async_hooks'; import { availableParallelism } from 'node:os'; -import { fileURLToPath, URL } from 'node:url'; import { resolve } from 'node:path'; import { inspect, types } from 'node:util'; -import { Histogram, RecordableHistogram, createHistogram, performance } from 'node:perf_hooks'; +import { RecordableHistogram, createHistogram, performance } from 'node:perf_hooks'; import { setTimeout as sleep } from 'node:timers/promises'; +import { readFileSync } from 'node:fs'; import assert from 'node:assert'; -import { - READY, +import type { RequestMessage, ResponseMessage, StartupMessage, - commonState, + Transferable, + Task, + ResourceLimits, + EnvSpecifier, + TaskCallback, + TransferList, + TransferListItem +} from './types'; +import { kResponseCountField, kRequestCountField, kFieldCount, - Transferable, - Task, - TaskQueue, kQueueOptions, + kTransferable, + kValue +} from './symbols'; +import { + TaskQueue, isTaskQueue, + ArrayTaskQueue +} from './task_queue'; +import { + AbortSignalAny, + AbortSignalEventTarget, + AbortError, + AbortSignalEventEmitter, + onabort +} from './abort'; +import { + READY, + commonState, isTransferable, markMovable, isMovable, - kTransferable, - kValue + createHistogramSummary, + toHistogramIntegerNano, + maybeFileURLToPath } from './common'; import FixedQueue from './fixed-queue'; -import { version } from '../package.json'; - -const cpuParallelism : number = availableParallelism(); - -/* eslint-disable camelcase */ -interface HistogramSummary { - average: number; - mean: number; - stddev: number; - min: number; - max: number; - p0_001: number; - p0_01: number; - p0_1: number; - p1: number; - p2_5: number; - p10: number; - p25: number; - p50: number; - p75: number; - p90: number; - p97_5: number; - p99: number; - p99_9: number; - p99_99: number; - p99_999: number; -} -/* eslint-enable camelcase */ - -function createHistogramSummary (histogram: Histogram): HistogramSummary { - const { mean, stddev, min, max } = histogram; - - return { - average: mean / 1000, - mean: mean / 1000, - stddev, - min: min / 1000, - max: max / 1000, - p0_001: histogram.percentile(0.001) / 1000, - p0_01: histogram.percentile(0.01) / 1000, - p0_1: histogram.percentile(0.1) / 1000, - p1: histogram.percentile(1) / 1000, - p2_5: histogram.percentile(2.5) / 1000, - p10: histogram.percentile(10) / 1000, - p25: histogram.percentile(25) / 1000, - p50: histogram.percentile(50) / 1000, - p75: histogram.percentile(75) / 1000, - p90: histogram.percentile(90) / 1000, - p97_5: histogram.percentile(97.5) / 1000, - p99: histogram.percentile(99) / 1000, - p99_9: histogram.percentile(99.9) / 1000, - p99_99: histogram.percentile(99.99) / 1000, - p99_999: histogram.percentile(99.999) / 1000 - }; -} - -function toHistogramIntegerNano (milliseconds: number): number { - return Math.max(1, Math.trunc(milliseconds * 1000)); -} - -interface AbortSignalEventTargetAddOptions { - once : boolean; -}; - -interface AbortSignalEventTarget { - addEventListener : ( - name : 'abort', - listener : () => void, - options? : AbortSignalEventTargetAddOptions) => void; - removeEventListener : ( - name : 'abort', - listener : () => void) => void; - aborted? : boolean; - reason?: unknown; -} -interface AbortSignalEventEmitter { - off : (name : 'abort', listener : () => void) => void; - once : (name : 'abort', listener : () => void) => void; -} -type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter; -function onabort (abortSignal : AbortSignalAny, listener : () => void) { - if ('addEventListener' in abortSignal) { - abortSignal.addEventListener('abort', listener, { once: true }); - } else { - abortSignal.once('abort', listener); - } -} - -class AbortError extends Error { - constructor (reason?: AbortSignalEventTarget['reason']) { - // TS does not recognizes the cause clause - // @ts-expect-error - super('The task has been aborted', { cause: reason }); - } - - get name () { return 'AbortError'; } -} - -type ResourceLimits = Worker extends { - resourceLimits? : infer T; -} ? T : {}; -type EnvSpecifier = typeof Worker extends { - new (filename : never, options?: { env: infer T }) : Worker; -} ? T : never; -export class ArrayTaskQueue implements TaskQueue { - tasks : Task[] = []; - - get size () { return this.tasks.length; } - - shift () : Task | null { - return this.tasks.shift() as Task; - } - - push (task : Task) : void { - this.tasks.push(task); - } +const { version } = JSON.parse( + readFileSync( + resolve(__dirname, '..', 'package.json'), + { + encoding: 'utf-8' + } + )); - remove (task : Task) : void { - const index = this.tasks.indexOf(task); - assert.notStrictEqual(index, -1); - this.tasks.splice(index, 1); - } -} +const cpuParallelism : number = availableParallelism(); interface Options { filename? : string | null, @@ -191,6 +99,28 @@ interface FilledOptions extends Options { recordTiming : boolean } +interface RunOptions { + transferList? : TransferList, + filename? : string | null, + signal? : AbortSignalAny | null, + name? : string | null +} + +interface FilledRunOptions extends RunOptions { + transferList : TransferList | never, + filename : string | null, + signal : AbortSignalAny | null, + name : string | null +} + +interface CloseOptions { + force?: boolean, +} + +type ResponseCallback = (response : ResponseMessage) => void; + +let taskIdCounter = 0; + const kDefaultOptions : FilledOptions = { filename: null, name: 'default', @@ -207,20 +137,6 @@ const kDefaultOptions : FilledOptions = { recordTiming: true }; -interface RunOptions { - transferList? : TransferList, - filename? : string | null, - signal? : AbortSignalAny | null, - name? : string | null -} - -interface FilledRunOptions extends RunOptions { - transferList : TransferList | never, - filename : string | null, - signal : AbortSignalAny | null, - name : string | null -} - const kDefaultRunOptions : FilledRunOptions = { transferList: undefined, filename: null, @@ -228,14 +144,23 @@ const kDefaultRunOptions : FilledRunOptions = { name: null }; -interface CloseOptions { - force?: boolean, -} - const kDefaultCloseOptions : Required = { force: false }; +const Errors = { + ThreadTermination: + () => new Error('Terminating worker thread'), + FilenameNotProvided: + () => new Error('filename must be provided to run() or in options object'), + TaskQueueAtLimit: + () => new Error('Task queue is at limit'), + NoTaskQueueAvailable: + () => new Error('No task queue available and all Workers are busy'), + CloseTimeout: + () => new Error('Close operation timed out') +}; + class DirectlyTransferable implements Transferable { #value : object; constructor (value : object) { @@ -258,21 +183,6 @@ class ArrayBufferViewTransferable implements Transferable { get [kValue] () : object { return this.#view; } } -let taskIdCounter = 0; - -type TaskCallback = (err : Error, result: any) => void; -// Grab the type of `transferList` off `MessagePort`. At the time of writing, -// only ArrayBuffer and MessagePort are valid, but let's avoid having to update -// our types here every time Node.js adds support for more objects. -type TransferList = MessagePort extends { postMessage(value : any, transferList : infer T) : any; } ? T : never; -type TransferListItem = TransferList extends (infer T)[] ? T : never; - -function maybeFileURLToPath (filename : string) : string { - return filename.startsWith('file:') - ? fileURLToPath(new URL(filename)) - : filename; -} - // Extend AsyncResource so that async relations between posting a task and // receiving its result are visible to diagnostic tools. class TaskInfo extends AsyncResource implements Task { @@ -443,21 +353,6 @@ class AsynchronouslyCreatedResourcePool< } } -type ResponseCallback = (response : ResponseMessage) => void; - -const Errors = { - ThreadTermination: - () => new Error('Terminating worker thread'), - FilenameNotProvided: - () => new Error('filename must be provided to run() or in options object'), - TaskQueueAtLimit: - () => new Error('Task queue is at limit'), - NoTaskQueueAvailable: - () => new Error('No task queue available and all Workers are busy'), - CloseTimeout: - () => new Error('Close operation timed out') -}; - class WorkerInfo extends AsynchronouslyCreatedResource { worker : Worker; taskInfos : Map; diff --git a/src/worker.ts b/src/worker.ts index 18c76d11..a829cd2f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,18 +1,22 @@ import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'node:worker_threads'; import { pathToFileURL } from 'node:url'; -import { - READY, - commonState, +import type { ReadyMessage, RequestMessage, ResponseMessage, - StartupMessage, + StartupMessage +} from './types'; +import { kResponseCountField, kRequestCountField, - isMovable, kTransferable, kValue +} from './symbols'; +import { + READY, + commonState, + isMovable } from './common'; commonState.isWorkerThread = true; From 899c8705f69284ccfa3f5834bbd01c2018bedd89 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Sun, 5 May 2024 21:53:41 +0200 Subject: [PATCH 3/9] chore: scope implementations together --- src/abort.ts | 41 +++++++++++++++++++++ src/symbols.ts | 9 +++++ src/task_queue.ts | 50 +++++++++++++++++++++++++ src/types.ts | 94 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 194 insertions(+) create mode 100644 src/abort.ts create mode 100644 src/symbols.ts create mode 100644 src/task_queue.ts create mode 100644 src/types.ts diff --git a/src/abort.ts b/src/abort.ts new file mode 100644 index 00000000..1484a810 --- /dev/null +++ b/src/abort.ts @@ -0,0 +1,41 @@ +interface AbortSignalEventTargetAddOptions { + once: boolean; +} + +export interface AbortSignalEventTarget { + addEventListener: ( + name: 'abort', + listener: () => void, + options?: AbortSignalEventTargetAddOptions + ) => void; + removeEventListener: (name: 'abort', listener: () => void) => void; + aborted?: boolean; + reason?: unknown; +} + +export interface AbortSignalEventEmitter { + off: (name: 'abort', listener: () => void) => void; + once: (name: 'abort', listener: () => void) => void; +} + +export type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter; + +export class AbortError extends Error { + constructor (reason?: AbortSignalEventTarget['reason']) { + // TS does not recognizes the cause clause + // @ts-expect-error + super('The task has been aborted', { cause: reason }); + } + + get name () { + return 'AbortError'; + } +} + +export function onabort (abortSignal: AbortSignalAny, listener: () => void) { + if ('addEventListener' in abortSignal) { + abortSignal.addEventListener('abort', listener, { once: true }); + } else { + abortSignal.once('abort', listener); + } +} diff --git a/src/symbols.ts b/src/symbols.ts new file mode 100644 index 00000000..b3c1d678 --- /dev/null +++ b/src/symbols.ts @@ -0,0 +1,9 @@ +// Internal symbol used to mark Transferable objects returned +// by the Piscina.move() function +export const kMovable = Symbol('Piscina.kMovable'); +export const kTransferable = Symbol.for('Piscina.transferable'); +export const kValue = Symbol.for('Piscina.valueOf'); +export const kQueueOptions = Symbol.for('Piscina.queueOptions'); +export const kRequestCountField = 0; +export const kResponseCountField = 1; +export const kFieldCount = 2; diff --git a/src/task_queue.ts b/src/task_queue.ts new file mode 100644 index 00000000..a5c80a87 --- /dev/null +++ b/src/task_queue.ts @@ -0,0 +1,50 @@ +import assert from 'node:assert'; + +import type { Task } from './types'; + +export interface TaskQueue { + readonly size: number; + shift(): Task | null; + remove(task: Task): void; + push(task: Task): void; +} + +export class ArrayTaskQueue implements TaskQueue { + tasks: Task[] = []; + + get size () { + return this.tasks.length; + } + + shift (): Task | null { + return this.tasks.shift() as Task; + } + + push (task: Task): void { + this.tasks.push(task); + } + + remove (task: Task): void { + const index = this.tasks.indexOf(task); + assert.notStrictEqual(index, -1); + this.tasks.splice(index, 1); + } +} + +/** + * Verifies if a given TaskQueue is valid + * + * @export + * @param {*} value + * @return {*} {boolean} + */ +export function isTaskQueue (value: TaskQueue): boolean { + return ( + typeof value === 'object' && + value !== null && + 'size' in value && + typeof value.shift === 'function' && + typeof value.remove === 'function' && + typeof value.push === 'function' + ); +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 00000000..62b7c283 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,94 @@ +import type { MessagePort, Worker } from 'node:worker_threads'; + +import type { READY } from './common'; +import type { kTransferable, kValue, kQueueOptions, kMovable } from './symbols'; + +export interface StartupMessage { + filename: string | null; + name: string; + port: MessagePort; + sharedBuffer: Int32Array; + useAtomics: boolean; + niceIncrement: number; +} + +export type PiscinaMovable = { + [kMovable]: boolean; +}; + +export interface RequestMessage { + taskId: number; + task: any; + filename: string; + name: string; +} + +export interface ReadyMessage { + [READY]: true; +} + +export interface ResponseMessage { + taskId: number; + result: any; + error: Error | null; +} +export const commonState = { + isWorkerThread: false, + workerData: undefined +}; + +export interface Transferable { + readonly [kTransferable]: object; + readonly [kValue]: object; +} + +export interface Task { + readonly [kQueueOptions]: object | null; +} + +/* eslint-disable camelcase */ +export interface HistogramSummary { + average: number; + mean: number; + stddev: number; + min: number; + max: number; + p0_001: number; + p0_01: number; + p0_1: number; + p1: number; + p2_5: number; + p10: number; + p25: number; + p50: number; + p75: number; + p90: number; + p97_5: number; + p99: number; + p99_9: number; + p99_99: number; + p99_999: number; +} +/* eslint-enable camelcase */ + +export type ResourceLimits = Worker extends { + resourceLimits?: infer T; +} + ? T + : {}; +export type EnvSpecifier = typeof Worker extends { + new (filename: never, options?: { env: infer T }): Worker; +} + ? T + : never; + +export type TaskCallback = (err: Error, result: any) => void; +// Grab the type of `transferList` off `MessagePort`. At the time of writing, +// only ArrayBuffer and MessagePort are valid, but let's avoid having to update +// our types here every time Node.js adds support for more objects. +export type TransferList = MessagePort extends { + postMessage(value: any, transferList: infer T): any; +} + ? T + : never; +export type TransferListItem = TransferList extends (infer T)[] ? T : never; From 2073ba0d59df66d592a5520273b86ed9f2fb3f1b Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Sun, 5 May 2024 22:00:42 +0200 Subject: [PATCH 4/9] chore: ts root dir --- package.json | 2 +- tsconfig.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 78c2931f..cd25ec2f 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "piscina", "version": "4.5.0", "description": "A fast, efficient Node.js Worker Thread Pool implementation", - "main": "./dist/src/main.js", + "main": "./dist/main.js", "exports": { "types": "./dist/src/index.d.ts", "import": "./dist/esm-wrapper.mjs", diff --git a/tsconfig.json b/tsconfig.json index 921da16d..1cb3ed5f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,7 +5,7 @@ "moduleResolution": "node", "lib": ["es2019"], "outDir": "dist", - "rootDir": ".", + "rootDir": "./src", "declaration": true, "sourceMap": true, From f3c117b72814fab19af01414e1e0bbbd2022697c Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 8 May 2024 10:48:11 +0200 Subject: [PATCH 5/9] chore: adjust entrypoints --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index cd25ec2f..9964e479 100644 --- a/package.json +++ b/package.json @@ -4,9 +4,9 @@ "description": "A fast, efficient Node.js Worker Thread Pool implementation", "main": "./dist/main.js", "exports": { - "types": "./dist/src/index.d.ts", + "types": "./dist/index.d.ts", "import": "./dist/esm-wrapper.mjs", - "require": "./dist/src/main.js" + "require": "./dist/main.js" }, "types": "./dist/src/index.d.ts", "scripts": { From ade5c2476c3c80136d01246dfc16e8b8516bacb8 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 15 May 2024 10:20:52 +0200 Subject: [PATCH 6/9] fix: revert type --- src/common.ts | 8 +++----- src/types.ts | 6 +----- test/move-test.ts | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/common.ts b/src/common.ts index 6f56e508..51caa22e 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,10 +1,7 @@ import type { Histogram } from 'node:perf_hooks'; import { fileURLToPath, URL } from 'node:url'; -import type { - PiscinaMovable, - HistogramSummary -} from './types'; +import type { HistogramSummary } from './types'; import { kMovable, kTransferable, kValue } from './symbols'; // States wether the worker is ready to receive tasks @@ -30,11 +27,12 @@ export function isTransferable (value: unknown): boolean { * True if object implements Transferable and has been returned * by the Piscina.move() function * + * TODO: narrow down the type of value * @export * @param {(unknown & PiscinaMovable)} value * @return {*} {boolean} */ -export function isMovable (value: unknown & PiscinaMovable): boolean { +export function isMovable (value: any): boolean { return isTransferable(value) && value[kMovable] === true; } diff --git a/src/types.ts b/src/types.ts index 62b7c283..08b8d8d5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,7 +1,7 @@ import type { MessagePort, Worker } from 'node:worker_threads'; import type { READY } from './common'; -import type { kTransferable, kValue, kQueueOptions, kMovable } from './symbols'; +import type { kTransferable, kValue, kQueueOptions } from './symbols'; export interface StartupMessage { filename: string | null; @@ -12,10 +12,6 @@ export interface StartupMessage { niceIncrement: number; } -export type PiscinaMovable = { - [kMovable]: boolean; -}; - export interface RequestMessage { taskId: number; task: any; diff --git a/test/move-test.ts b/test/move-test.ts index e00e82c2..a4e5e542 100644 --- a/test/move-test.ts +++ b/test/move-test.ts @@ -3,7 +3,7 @@ import { isMovable, markMovable, isTransferable -} from '../dist/src/common'; +} from '../dist/common'; import { test } from 'tap'; import { types } from 'util'; import { MessageChannel, MessagePort } from 'worker_threads'; From 00bf2a2bf3761a5c6b7351b3640696ece1582f39 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 15 May 2024 10:40:48 +0200 Subject: [PATCH 7/9] test: fix imports --- test/fixtures/send-buffer-then-get-length.js | 2 +- test/fixtures/send-transferrable-then-get-length.js | 2 +- test/messages.ts | 2 +- test/task-queue.ts | 3 ++- test/test-is-buffer-transferred.ts | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/test/fixtures/send-buffer-then-get-length.js b/test/fixtures/send-buffer-then-get-length.js index 585b911d..a1d154b3 100644 --- a/test/fixtures/send-buffer-then-get-length.js +++ b/test/fixtures/send-buffer-then-get-length.js @@ -1,6 +1,6 @@ 'use strict'; -const Piscina = require('../../dist/src'); +const Piscina = require('../../dist'); let time; module.exports = { diff --git a/test/fixtures/send-transferrable-then-get-length.js b/test/fixtures/send-transferrable-then-get-length.js index 82174d2a..829b4d5f 100644 --- a/test/fixtures/send-transferrable-then-get-length.js +++ b/test/fixtures/send-transferrable-then-get-length.js @@ -1,6 +1,6 @@ 'use strict'; -const Piscina = require('../../dist/src'); +const Piscina = require('../../dist'); class Shared { constructor (data) { diff --git a/test/messages.ts b/test/messages.ts index b6e5bdf7..ce9a026d 100644 --- a/test/messages.ts +++ b/test/messages.ts @@ -1,4 +1,4 @@ -import Piscina from '../dist/src'; +import Piscina from '..'; import { test } from 'tap'; import { resolve } from 'path'; import { once } from 'events'; diff --git a/test/task-queue.ts b/test/task-queue.ts index 5bf53785..9a55d764 100644 --- a/test/task-queue.ts +++ b/test/task-queue.ts @@ -1,7 +1,8 @@ import Piscina from '..'; import { test } from 'tap'; import { resolve } from 'path'; -import { Task, TaskQueue } from '../dist/src/common'; +import { TaskQueue } from '../dist/task_queue'; +import { Task } from '../dist/types'; test('will put items into a task queue until they can run', async ({ equal }) => { const pool = new Piscina({ diff --git a/test/test-is-buffer-transferred.ts b/test/test-is-buffer-transferred.ts index 63bcf13e..878386ee 100644 --- a/test/test-is-buffer-transferred.ts +++ b/test/test-is-buffer-transferred.ts @@ -1,4 +1,4 @@ -import Piscina from '../dist/src'; +import Piscina from '..'; import { test } from 'tap'; import { resolve } from 'path'; From c833817700b7160aa3990790e4a0f1691f30bbce Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Mon, 20 May 2024 11:24:55 +0200 Subject: [PATCH 8/9] fix: imports --- src/fixed-queue.ts | 3 ++- test/fixed-queue.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/fixed-queue.ts b/src/fixed-queue.ts index bb57a066..2bc2a4af 100644 --- a/src/fixed-queue.ts +++ b/src/fixed-queue.ts @@ -4,7 +4,8 @@ * Source: https://github.com/nodejs/node/blob/de7b37880f5a541d5f874c1c2362a65a4be76cd0/lib/internal/fixed_queue.js */ import assert from 'node:assert'; -import { TaskQueue, Task } from './common'; +import { TaskQueue } from './task_queue'; +import { Task } from './types'; // Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. const kSize = 2048; const kMask = kSize - 1; diff --git a/test/fixed-queue.ts b/test/fixed-queue.ts index b1bd3312..085e0132 100644 --- a/test/fixed-queue.ts +++ b/test/fixed-queue.ts @@ -1,5 +1,6 @@ import { test } from 'tap'; -import { Task, kQueueOptions } from '../dist/src/common'; +import { Task } from '../dist/types'; +import { kQueueOptions } from '../dist/symbols'; import { Piscina, FixedQueue } from '..'; import { resolve } from 'node:path'; From 963ae8859c4f89b45f25aabb46434c2042e09b74 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Mon, 20 May 2024 12:46:39 +0200 Subject: [PATCH 9/9] fix: types path --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 9964e479..4b9ec61c 100644 --- a/package.json +++ b/package.json @@ -3,12 +3,12 @@ "version": "4.5.0", "description": "A fast, efficient Node.js Worker Thread Pool implementation", "main": "./dist/main.js", + "types": "./dist/index.d.ts", "exports": { "types": "./dist/index.d.ts", "import": "./dist/esm-wrapper.mjs", "require": "./dist/main.js" }, - "types": "./dist/src/index.d.ts", "scripts": { "build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs", "lint": "standardx \"**/*.{ts,mjs,js,cjs}\" | snazzy",