Skip to content

Commit

Permalink
feat($compile): Added server event
Browse files Browse the repository at this point in the history
Added an api method for getting an observable of events. The Observable acts as a stream of events
from the server.
  • Loading branch information
tomyitav committed Apr 19, 2018
1 parent d4cc76a commit 6a6cabc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "redis-messaging-manager",
"version": "0.0.5",
"version": "0.0.6",
"description": "Pubsub messaging library, using redis and rxjs 5",
"keywords": ["redis", "rxjs", "pubsub"],
"main": "dist/redis-messaging-manager.umd.js",
Expand Down
40 changes: 31 additions & 9 deletions src/redis-messaging-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ export class PubsubManager {
private options: Redis.RedisOptions
private redisClient: Redis.Redis
private topicMaps: Map<string, Observable<any>>
private serverEventsToObservables: Map<ServerEvent, Observable<any>>

constructor(host?: string, port?: number, options?: Redis.RedisOptions) {
this.options = Object.assign(
{},
this.getDefaultOptions(host, port),
options
)
constructor(options?: Redis.RedisOptions) {
this.options = Object.assign({}, this.getDefaultOptions(), options)
this.redisClient = new Redis(this.options)
this.topicMaps = new Map()
this.serverEventsToObservables = new Map()
}

public publish(topic: string, message: string): Promise<number> {
Expand All @@ -27,6 +25,12 @@ export class PubsubManager {
: this.createNewTopicObservable(topic)
}

public getServerEventStream(eventName: ServerEvent): Observable<any> {
return this.serverEventsToObservables.has(eventName)
? this.serverEventsToObservables.get(eventName)
: this.createNewEventObservable(eventName)
}

public unsubscribe(topic: string) {
this.redisClient.unsubscribe(topic)
this.topicMaps.delete(topic)
Expand Down Expand Up @@ -59,10 +63,20 @@ export class PubsubManager {
return newObservable
}

private getDefaultOptions(host?: string, port?: number): Redis.RedisOptions {
private createNewEventObservable(event: ServerEvent): Observable<any> {
let eventObservable = Observable.create(observer => {
this.redisClient.on(event, () => {
observer.next()
})
})
this.serverEventsToObservables.set(event, eventObservable)
return eventObservable
}

private getDefaultOptions(): Redis.RedisOptions {
return {
host: host || 'localhost',
port: port || 6379,
host: 'localhost',
port: 6379,
retryStrategy: times => {
let delay = Math.min(100 + times * 2, 2000)
return delay
Expand All @@ -71,3 +85,11 @@ export class PubsubManager {
}
}
}

export type ServerEvent =
| 'connect'
| 'ready'
| 'error'
| 'close'
| 'reconnecting'
| 'end'

0 comments on commit 6a6cabc

Please sign in to comment.