Skip to content

Commit

Permalink
@web5/agent DWN Subscriptions Support (#492)
Browse files Browse the repository at this point in the history
* support subscriptions in agent

* update api and server to support prune from newest dwn-sdk-js

* implement subscriptions over sockets for agent `sendRequest` flow
  • Loading branch information
LiranCohen committed May 13, 2024
1 parent 794d18c commit b516a5f
Show file tree
Hide file tree
Showing 17 changed files with 795 additions and 70 deletions.
10 changes: 10 additions & 0 deletions .changeset/six-pandas-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

- `@web5/agent` DWN Subscriptions Support
- `@web5/agent` supports latest `dwn-sdk-js` with `prune` feature from `RecordsWriteDelete`

5 changes: 5 additions & 0 deletions .changeset/twelve-trainers-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@web5/api": patch
---

`@web5/api` supports `prune` via `RecordsWriteDelete`
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"@changesets/cli": "^2.27.1",
"@npmcli/package-json": "5.0.0",
"@typescript-eslint/eslint-plugin": "6.4.0",
"@web5/dwn-server": "0.2.1",
"@web5/dwn-server": "0.2.2",
"eslint-plugin-mocha": "10.1.0",
"npkill": "0.11.3"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"dependencies": {
"@noble/ciphers": "0.4.1",
"@scure/bip39": "1.2.2",
"@tbd54566975/dwn-sdk-js": "0.3.1",
"@tbd54566975/dwn-sdk-js": "0.3.2",
"@web5/common": "1.0.0",
"@web5/crypto": "1.0.0",
"@web5/dids": "1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/src/did-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class AgentDidApi<TKeyManager extends AgentKeyManager = AgentKeyManager>
// Resolve the DID document.
const { didDocument, didResolutionMetadata } = await this.resolve(didUri);
if (!didDocument) {
throw new Error(`DID resolution failed for '${didUri}': ${didResolutionMetadata.error}`);
throw new Error(`DID resolution failed for '${didUri}': ${JSON.stringify(didResolutionMetadata)}`);
}

// Retrieve the method-specific verification method to be used for signing operations.
Expand Down
49 changes: 44 additions & 5 deletions packages/agent/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/
import { Convert, NodeStream } from '@web5/common';
import { utils as cryptoUtils } from '@web5/crypto';
import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
import { Cid, DataStoreLevel, Dwn, EventLogLevel, Message, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';

import type { Web5PlatformAgent } from './types/agent.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';

import { DwnInterface, dwnMessageConstructors } from './types/dwn.js';
import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js';
Expand All @@ -32,6 +32,13 @@ export function isDwnRequest<T extends DwnInterface>(
return dwnRequest.messageType === messageType;
}

export function isDwnMessage<T extends DwnInterface>(
messageType: T, message: GenericMessage
): message is DwnMessage[T] {
const incomingMessageInterfaceName = message.descriptor.interface + message.descriptor.method;
return incomingMessageInterfaceName === messageType;
}

export class AgentDwnApi {
/**
* Holds the instance of a `Web5PlatformAgent` that represents the current execution context for
Expand Down Expand Up @@ -114,13 +121,16 @@ export class AgentDwnApi {
// Readable stream.
const { message, dataStream } = await this.constructDwnMessage({ request });

// Extracts the optional subscription handler from the request to pass into `processMessage.
const { subscriptionHandler } = request;

// Conditionally processes the message with the DWN instance:
// - If `store` is not explicitly set to false, it sends the message to the DWN node for
// processing, passing along the target DID, the message, and any associated data stream.
// - If `store` is set to false, it immediately returns a simulated 'accepted' status without
// storing the message/data in the DWN node.
const reply: DwnMessageReply[T] = (request.store !== false)
? await this._dwn.processMessage(request.target, message, { dataStream })
? await this._dwn.processMessage(request.target, message, { dataStream, subscriptionHandler })
: { status: { code: 202, detail: 'Accepted' } };

// Returns an object containing the reply from processing the message, the original message,
Expand All @@ -144,6 +154,7 @@ export class AgentDwnApi {
let messageCid: string | undefined;
let message: DwnMessage[T];
let data: Blob | undefined;
let subscriptionHandler: MessageHandler[T] | undefined;

// If `messageCid` is given, retrieve message and data, if any.
if ('messageCid' in request) {
Expand All @@ -161,14 +172,16 @@ export class AgentDwnApi {
throw new Error('AgentDwnApi: DataStream must be provided as a Blob');
}
data = request.dataStream;
subscriptionHandler = request.subscriptionHandler;
}

// Send the RPC request to the target DID's DWN service endpoint using the Agent's RPC client.
const reply = await this.sendDwnRpcRequest({
targetDid: request.target,
dwnEndpointUrls,
message,
data
data,
subscriptionHandler
});

// If the message CID was not given in the `request`, compute it.
Expand All @@ -180,25 +193,51 @@ export class AgentDwnApi {
}

private async sendDwnRpcRequest<T extends DwnInterface>({
targetDid, dwnEndpointUrls, message, data
targetDid, dwnEndpointUrls, message, data, subscriptionHandler
}: {
targetDid: string;
dwnEndpointUrls: string[];
message: DwnMessage[T];
data?: Blob;
subscriptionHandler?: MessageHandler[T];
}
): Promise<DwnMessageReply[T]> {
const errorMessages: { url: string, message: string }[] = [];

if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionHandler === undefined) {
throw new Error('AgentDwnApi: Subscription handler is required for subscription requests.');
}

// Try sending to author's publicly addressable DWNs until the first request succeeds.
for (let dwnUrl of dwnEndpointUrls) {
try {
if (subscriptionHandler !== undefined) {
// we get the server info to check if the server supports WebSocket for subscription requests
const serverInfo = await this.agent.rpc.getServerInfo(dwnUrl);
if (!serverInfo.webSocketSupport) {
// If the server does not support WebSocket, add an error message and continue to the next URL.
errorMessages.push({
url : dwnUrl,
message : 'WebSocket support is not enabled on the server.'
});
continue;
}

// If the server supports WebSocket, replace the subscription URL with a socket transport.
// For `http` we use the unsecured `ws` protocol, and for `https` we use the secured `wss` protocol.
const parsedUrl = new URL(dwnUrl);
parsedUrl.protocol = parsedUrl.protocol === 'http:' ? 'ws:' : 'wss:';
dwnUrl = parsedUrl.toString();
}

const dwnReply = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid,
message,
data,
subscriptionHandler
});

return dwnReply;
} catch(error: any) {
errorMessages.push({
Expand Down
6 changes: 3 additions & 3 deletions packages/agent/src/prototyping/clients/dwn-rpc-types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import type { RecordsReadReply, UnionMessageReply, EventSubscriptionHandler, RecordSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

export interface SerializableDwnMessage {
toJSON(): string;
}

export type DwnEventSubscriptionHandler = (event: MessageEvent) => void;
export type DwnSubscriptionHandler = EventSubscriptionHandler | RecordSubscriptionHandler;

/**
* Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers}
Expand Down Expand Up @@ -45,7 +45,7 @@ export type DwnRpcRequest = {
targetDid: string;

/** Optional subscription handler for DWN events. */
subscriptionHandler?: DwnEventSubscriptionHandler;
subscriptionHandler?: DwnSubscriptionHandler;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/agent/src/prototyping/clients/web-socket-clients.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DwnEventSubscriptionHandler, DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js';
import type { DwnRpc, DwnRpcRequest, DwnRpcResponse, DwnSubscriptionHandler } from './dwn-rpc-types.js';
import type { GenericMessage, MessageSubscription, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';

import { utils as cryptoUtils } from '@web5/crypto';
Expand Down Expand Up @@ -60,7 +60,7 @@ export class WebSocketDwnRpcClient implements DwnRpc {
return result.reply as DwnRpcResponse;
}

private static async subscriptionRequest(connection: SocketConnection, target:string, message: GenericMessage, messageHandler: DwnEventSubscriptionHandler): Promise<DwnRpcResponse> {
private static async subscriptionRequest(connection: SocketConnection, target:string, message: GenericMessage, messageHandler: DwnSubscriptionHandler): Promise<DwnRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const subscriptionId = cryptoUtils.randomUuid();
const request = createJsonRpcSubscriptionRequest(requestId, 'dwn.processMessage', subscriptionId, { target, message });
Expand Down
6 changes: 3 additions & 3 deletions packages/agent/src/rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ export class Web5RpcClient implements Web5Rpc {
constructor(clients: Web5Rpc[] = []) {
this.transportClients = new Map();

// include http client as default. can be overwritten for 'http:' or 'https:' if instantiator provides
// their own.
clients = [new HttpWeb5RpcClient(), ...clients];
// include http and socket clients as default.
// can be overwritten for 'http:', 'https:', 'ws: or ':wss' if instantiated with other clients.
clients = [new HttpWeb5RpcClient(), new WebSocketWeb5RpcClient(), ...clients];

for (let client of clients) {
for (let transportScheme of client.transportProtocols) {
Expand Down
7 changes: 5 additions & 2 deletions packages/agent/src/test-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { AbstractLevel } from 'abstract-level';

import { Level } from 'level';
import { LevelStore, MemoryStore } from '@web5/common';
import { DataStoreLevel, Dwn, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DataStoreLevel, Dwn, EventEmitterStream, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DidDht, DidJwk, DidResolutionResult, DidResolverCache, DidResolverCacheLevel } from '@web5/dids';

import type { Web5PlatformAgent } from './types/agent.js';
Expand Down Expand Up @@ -180,6 +180,8 @@ export class PlatformAgentTestHarness {
// Note: There is no in-memory store for DWN, so we always use LevelDB-based disk stores.
const dwnDataStore = new DataStoreLevel({ blockstoreLocation: testDataPath('DWN_DATASTORE') });
const dwnEventLog = new EventLogLevel({ location: testDataPath('DWN_EVENTLOG') });
const dwnEventStream = new EventEmitterStream();

const dwnMessageStore = new MessageStoreLevel({
blockstoreLocation : testDataPath('DWN_MESSAGESTORE'),
indexLocation : testDataPath('DWN_MESSAGEINDEX')
Expand All @@ -191,7 +193,8 @@ export class PlatformAgentTestHarness {
dataStore : dwnDataStore,
didResolver : didApi,
eventLog : dwnEventLog,
messageStore : dwnMessageStore
eventStream : dwnEventStream,
messageStore : dwnMessageStore,
});

// Instantiate Agent's DWN API using the custom DWN instance.
Expand Down
Loading

0 comments on commit b516a5f

Please sign in to comment.