Skip to content

Commit

Permalink
feat: Queue "cached" commands if its execution timed out
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkvak committed Feb 13, 2020
1 parent 6b9c13b commit b024999
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ export function OperationTimeoutError(timeout: number): Error {

return customError(ERRORS.OperationTimeoutError, text);
}

export function isOperationTimeoutError(error: any): boolean {
return error instanceof Error && error.name === ERRORS.OperationTimeoutError;
}
17 changes: 17 additions & 0 deletions src/storages/base.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import TestStorageAdapter from '../adapters/test';
import { ConnectionStatus } from '../connection-status';
import { OperationTimeoutError } from '../errors';
import timeout from '../timeout';
import { BaseStorage, TAGS_VERSIONS_ALIAS } from './base';

Expand Down Expand Up @@ -271,6 +272,22 @@ describe('BaseStorage', () => {
expect(storage.commandsQueue).toEqual([{ fn: command, params: [1, 'hello' ]}]);
});

it('cachedCommand pushes command to command queue if execution timed out', async () => {
const error = OperationTimeoutError(1);
const command = jest.fn().mockRejectedValue(error);

await expect((storage as any).cachedCommand(command, 1, 'hello')).resolves.toEqual(undefined);
expect(storage.commandsQueue).toEqual([{ fn: command, params: [1, 'hello' ]}]);
});

it('cachedCommand throws if command execution fails and not timed out', async () => {
const error = new Error();
const command = jest.fn().mockRejectedValue(error);

await expect((storage as any).cachedCommand(command, 1, 'hello')).rejects.toThrowError(error);
expect(storage.commandsQueue.length).toEqual(0);
});

it('executeCommandsFromQueue does nothing if queue is empty', async () => {
await expect((storage as any).executeCommandsFromQueue()).resolves.not.toThrow();
});
Expand Down
27 changes: 19 additions & 8 deletions src/storages/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import isFunction from 'lodash/isFunction';
import uniq from 'lodash/uniq';
import { ConnectionStatus } from '../connection-status';
import deserialize from '../deserialize';
import { isOperationTimeoutError } from '../errors';
import createRecord from '../record';
import createTag from '../record/create-tag';
import serialize from '../serialize';
Expand Down Expand Up @@ -30,7 +31,7 @@ export type BaseStorageOptions = {
*/
export interface Command {
fn: CommandFn;
params: any;
params: any[];
}

/**
Expand Down Expand Up @@ -234,7 +235,7 @@ export class BaseStorage implements Storage {
/**
* Executes commands from offline queue. Re-queues commands which was not successfully executed.
*/
private async executeCommandsFromQueue(): Promise<any> {
private async executeCommandsFromQueue(): Promise<void> {
if (!this.commandsQueue.length) {
return;
}
Expand All @@ -254,7 +255,8 @@ export class BaseStorage implements Storage {

/**
* All commands wrapped with this method will be "cached". This means that if there are problems with the connection
* the response will be sent immediately and the command will be executed later when the connection is restored.
* the response will be sent immediately and the command will be executed later when the connection is restored
* or current execution timed out.
*/
private async cachedCommand(fn: CommandFn, ...args: any[]): Promise<void> {
if (!fn) {
Expand All @@ -264,15 +266,24 @@ export class BaseStorage implements Storage {
const connectionStatus = this.adapter.getConnectionStatus();

if (connectionStatus !== ConnectionStatus.CONNECTED) {
this.commandsQueue.push({
fn,
params: args
});
this.queueCommand(fn, args);
} else {
await fn(...args);
try {
await fn(...args);
} catch (error) {
if (isOperationTimeoutError(error)) {
this.queueCommand(fn, args);
} else {
throw error;
}
}
}
}

private queueCommand(fn: CommandFn, params: any[]): void {
this.commandsQueue.push({ fn, params });
}

/**
* Saves only new not touched tags in tag storage.
*/
Expand Down

0 comments on commit b024999

Please sign in to comment.