Skip to content

Commit

Permalink
[WIP] Records Subscriptions in @web5/api (#522)
Browse files Browse the repository at this point in the history
Add API for subscribing to records.
  • Loading branch information
LiranCohen committed Aug 23, 2024
1 parent 3d1f825 commit 2f7bbbe
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/lovely-rules-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@web5/api": patch
---

Add `records.subscribe()` functionality to the DwnApi
60 changes: 58 additions & 2 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The SDK is currently still under active development, but having entered the Tech
- [API Documentation](#api-documentation)
- [Web5.connect](#web5connectoptions)
- [web5.dwn.records.query](#web5dwnrecordsqueryrequest)
- [web5.dwn.records.subscribe](#web5dwnrecordssubscriberequest)
- [web5.dwn.records.create](#web5dwnrecordscreaterequest)
- [web5.dwn.records.write](#web5dwnrecordswriterequest)
- [web5.dwn.records.read](#web5dwnrecordsreadrequest)
Expand Down Expand Up @@ -233,6 +234,59 @@ The query `response` contains the following properties:
- **`records`** - _`Records array`_ (_optional_): the array of `Records` returned if the request was successful.
- **`cursor`** - _`messageCid string`_ (_optional_): the `messageCid` of the last message returned in the results if there are exist additional records beyond the specified `limit` in the `query`.

### **`web5.dwn.records.subscribe(request)`**

Method for subscribing to either the locally connected DWeb Node or any remote DWeb Node specified in the `from` property.

```javascript
// This invocation will subscribe the user's own DWeb Nodes

const subscriptionHandler = (record) => {
console.log("received", record);
};

const { status } = await web5.dwn.records.subscribe({
message: {
filter: {
protocol: "https://schema.org/protocols/social",
},
},
subscriptionHandler,
});

console.log(status.code === 200); // successful subscription

// This invocation will query Bob's DWeb Nodes
const { status } = await web5.dwn.records.query({
from: "did:example:bob",
message: {
filter: {
protocol: "https://schema.org/protocols/social",
},
},
subscriptionHandler,
});

console.log(status.code === 200); // successful subscription
```

#### **Request**

The query `request` contains the following properties:

- **`from`** - _`DID string`_ (_optional_): the decentralized identifier of the DWeb Node the subscribe will receive results from.
- **`message`** - _`object`_: the properties of the DWeb Node Message Descriptor that will be used to construct a valid record subscription:
- **`filter`** - _`object`_: properties against which results of the subscription will be filtered:
- **`recordId`** - _`string`_ (_optional_): the record ID string that identifies the record data you are fetching.
- **`protocol`** - _`URI string`_ (_optional_): the URI of the protocol bucket in which to subscribe to.
- **`protocolPath`** - _`string`_ (_optional_): the path to the record in the protocol configuration.
- **`contextId`** _`string`_ (_optional_): the `recordId` of a root record of a protocol.
- **`parentId`** _`string`_ (_optional_): the `recordId` of a the parent of a protocol record.
- **`recipient`** - _`string`_ (_optional_): the DID in the `recipient` field of the record.
- **`schema`** - _`URI string`_ (_optional_): the URI of the schema bucket in which to subscribe to.
- **`dataFormat`** - _`Media Type string`_ (_optional_): the IANA string corresponding with the format of the data to filter for. See IANA's Media Type list here: https://www.iana.org/assignments/media-types/media-types.xhtml
- **`subscriptionHandler`** - _`function`_: The handler function which emits a `Record` object when any matching records arrive.

### **`web5.dwn.records.create(request)`**

Method for creating a new record and storing it in the user's local DWeb Node, remote DWeb Nodes, or another party's DWeb Nodes (if permitted).
Expand Down Expand Up @@ -497,7 +551,9 @@ metadata associated with a DID.
#### **Usage**

```javascript
const { didDocument } = await web5.did.resolve('did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo');
const { didDocument } = await web5.did.resolve(
"did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo"
);
```

#### **Parameters**
Expand All @@ -514,7 +570,7 @@ The method returns a DID resolution result as a JavaScript object. The structure

#### **Notes**

- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable
- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable
data registry or a resolver endpoint, which may introduce latency based on the network conditions and the specific DID
method utilized.

Expand Down
89 changes: 81 additions & 8 deletions packages/api/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import {
DwnMessage,
DwnResponse,
DwnMessageParams,
DwnMessageSubscription,
DwnResponseStatus,
CachedPermissions,
ProcessDwnRequest,
DwnPaginationCursor,
DwnDataEncodedRecordsWriteMessage,
AgentPermissionsApi
AgentPermissionsApi,
} from '@web5/agent';

import { isEmptyObject } from '@web5/common';
Expand All @@ -32,8 +32,7 @@ import { dataToBlob } from './utils.js';
import { Protocol } from './protocol.js';
import { PermissionGrant } from './permission-grant.js';
import { PermissionRequest } from './permission-request.js';
import { DwnMessagesPermissionScope } from '@web5/agent';
import { DwnRecordsPermissionScope } from '@web5/agent';
import { SubscriptionUtil } from './subscription-util.js';

/**
* Represents the request payload for fetching permission requests from a Decentralized Web Node (DWN).
Expand Down Expand Up @@ -179,7 +178,7 @@ export type RecordsQueryResponse = DwnResponseStatus & {

/** If there are additional results, the messageCid of the last record will be returned as a pagination cursor. */
cursor?: DwnPaginationCursor;
};
}

/**
* Represents a request to read a specific record from a Decentralized Web Node (DWN).
Expand All @@ -206,7 +205,36 @@ export type RecordsReadRequest = {
export type RecordsReadResponse = DwnResponseStatus & {
/** The record retrieved by the read operation. */
record: Record;
};
}

/** Subscription handler for Records */
export type RecordsSubscriptionHandler = (record: Record) => void;

/**
* Represents a request to subscribe to records from a Decentralized Web Node (DWN).
*
* This request type is used to specify the target DWN from which records matching the subscription
* criteria should be emitted. It's useful for being notified in real time when records are written, deleted or modified.
*/
export type RecordsSubscribeRequest = {
/** Optional DID specifying the remote target DWN tenant to subscribe from. */
from?: string;

/** The parameters for the subscription operation, detailing the criteria for the subscription filter */
message: Omit<DwnMessageParams[DwnInterface.RecordsSubscribe], 'signer'>;

/** The handler to process the subscription events */
subscriptionHandler: RecordsSubscriptionHandler;
}

/** Encapsulates the response from a DWN RecordsSubscriptionRequest */
export type RecordsSubscribeResponse = DwnResponseStatus & {
/**
* Represents the subscription that was created. Includes an ID and the close method to stop the subscription.
*
* */
subscription?: DwnMessageSubscription;
}

/**
* Defines a request to write (create) a record to a Decentralized Web Node (DWN).
Expand Down Expand Up @@ -252,7 +280,7 @@ export type RecordsWriteResponse = DwnResponseStatus & {
* DWN as a result of the write operation.
*/
record?: Record
};
}

/**
* Interface to interact with DWN Records and Protocols
Expand Down Expand Up @@ -582,7 +610,6 @@ export class DwnApi {

return { status };
},

/**
* Query a single or multiple records based on the given filter
*/
Expand Down Expand Up @@ -733,6 +760,52 @@ export class DwnApi {
return { record, status };
},

/**
* Subscribes to records based on the given filter and emits events to the `subscriptionHandler`.
*
* @param request must include the `message` with the subscription filter and the `subscriptionHandler` to process the events.
* @returns the subscription status and the subscription object used to close the subscription.
*/
subscribe: async (request: RecordsSubscribeRequest): Promise<RecordsSubscribeResponse> => {
const agentRequest: ProcessDwnRequest<DwnInterface.RecordsSubscribe> = {
/**
* The `author` is the DID that will sign the message and must be the DID the Web5 app is
* connected with and is authorized to access the signing private key of.
*/
author : this.connectedDid,
messageParams : request.message,
messageType : DwnInterface.RecordsSubscribe,
/**
* The `target` is the DID of the DWN tenant under which the subscribe operation will be executed.
* If `from` is provided, the subscribe operation will be executed on a remote DWN.
* Otherwise, the local DWN will execute the subscribe operation.
*/
target : request.from || this.connectedDid,

/**
* The handler to process the subscription events.
*/
subscriptionHandler: SubscriptionUtil.recordSubscriptionHandler({
agent : this.agent,
connectedDid : this.connectedDid,
request
})
};

let agentResponse: DwnResponse<DwnInterface.RecordsSubscribe>;

if (request.from) {
agentResponse = await this.agent.sendDwnRequest(agentRequest);
} else {
agentResponse = await this.agent.processDwnRequest(agentRequest);
}

const reply = agentResponse.reply;
const { status, subscription } = reply;

return { status, subscription };
},

/**
* Writes a record to the DWN
*
Expand Down
29 changes: 26 additions & 3 deletions packages/api/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
DwnDateSort,
DwnPaginationCursor,
isDwnMessage,
SendDwnRequest
SendDwnRequest,
isRecordsWrite
} from '@web5/agent';

import { Convert, isEmptyObject, NodeStream, removeUndefinedProperties, Stream } from '@web5/common';
Expand Down Expand Up @@ -72,10 +73,22 @@ export type RecordModel = ImmutableRecordProperties & OptionalRecordProperties &
*
* @beta
*/
export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite] & {
export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete] & {
/** The DID that signed the record. */
author: string;

/** The attestation signature(s) for the record. */
attestation?: DwnMessage[DwnInterface.RecordsWrite]['attestation'];

/** The encryption information for the record. */
encryption?: DwnMessage[DwnInterface.RecordsWrite]['encryption'];

/** The contextId associated with the record. */
contextId?: string;

/** The unique identifier of the record */
recordId?: string;

/** The DID of the DWN tenant under which record operations are being performed. */
connectedDid: string;

Expand Down Expand Up @@ -360,7 +373,7 @@ export class Record implements RecordModel {
this._descriptor = options.descriptor;
this._encryption = options.encryption;
this._initialWrite = options.initialWrite;
this._recordId = options.recordId;
this._recordId = this.isRecordsDeleteDescriptor(options.descriptor) ? options.descriptor.recordId : options.recordId;
this._protocolRole = options.protocolRole;

if (options.encodedData) {
Expand Down Expand Up @@ -539,6 +552,7 @@ export class Record implements RecordModel {
/**
* Send the current record to a remote DWN by specifying their DID
* If no DID is specified, the target is assumed to be the owner (connectedDID).
*
* If an initial write is present and the Record class send cache has no awareness of it, the initial write is sent first
* (vs waiting for the regular DWN sync)
*
Expand Down Expand Up @@ -949,4 +963,13 @@ export class Record implements RecordModel {
}
}
}

/**
* Checks if the descriptor is a RecordsDelete descriptor.
*
* @param descriptor a RecordsWrite or RecordsDelete descriptor
*/
private isRecordsDeleteDescriptor(descriptor: DwnMessageDescriptor[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete]): descriptor is DwnMessageDescriptor[DwnInterface.RecordsDelete] {
return descriptor.interface + descriptor.method === DwnInterface.RecordsDelete;
}
}
33 changes: 33 additions & 0 deletions packages/api/src/subscription-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { DwnRecordSubscriptionHandler, getRecordAuthor, Web5Agent } from '@web5/agent';
import { RecordsSubscribeRequest } from './dwn-api.js';
import { Record } from './record.js';

/**
* Utility class for dealing with subscriptions.
*/
export class SubscriptionUtil {
/**
* Creates a record subscription handler that can be used to process incoming {Record} messages.
*/
static recordSubscriptionHandler({ agent, connectedDid, request }:{
agent: Web5Agent;
connectedDid: string;
request: RecordsSubscribeRequest;
}): DwnRecordSubscriptionHandler {
const { subscriptionHandler, from: remoteOrigin } = request;

return async (event) => {
const { message, initialWrite } = event;
const author = getRecordAuthor(message);
const recordOptions = {
author,
connectedDid,
remoteOrigin,
initialWrite
};

const record = new Record(agent, { ...message, ...recordOptions });
subscriptionHandler(record);
};
}
}
Loading

0 comments on commit 2f7bbbe

Please sign in to comment.