Skip to content

Commit

Permalink
Websocket message queue using streams and Delay between messages (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
avnotaklu authored Feb 6, 2023
1 parent 5e75f3a commit 1efc86d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
4 changes: 4 additions & 0 deletions lib/src/config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ class Settings {

/// ICE Gathering Timeout (in millisecond).
int ice_gathering_timeout = 500;

/// Sip Message Delay (in millisecond) ( default 0 ).
int sip_message_delay = 0;
}


// Configuration checks.
class Checks {
Map<String, Null Function(Settings src, Settings? dst)> mandatory =
Expand Down
6 changes: 4 additions & 2 deletions lib/src/sip_ua_helper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ class SIPUAHelper extends EventManager {
// Reset settings
_settings = Settings();
WebSocketInterface socket = WebSocketInterface(
uaSettings.webSocketUrl, uaSettings.webSocketSettings);
uaSettings.webSocketUrl, messageDelay: _settings.sip_message_delay, webSocketSettings: uaSettings.webSocketSettings);
_settings.sockets = <WebSocketInterface>[socket];
_settings.uri = uaSettings.uri;
_settings.sip_message_delay = uaSettings.sip_message_delay;
_settings.realm = uaSettings.realm;
_settings.password = uaSettings.password;
_settings.ha1 = uaSettings.ha1;
Expand Down Expand Up @@ -698,7 +699,8 @@ class UaSettings {

/// ICE Gathering Timeout, default 500ms
int iceGatheringTimeout = 500;

/// Sip Message Delay (in millisecond) (default 0).
int sip_message_delay = 0;
List<Map<String, String>> iceServers = <Map<String, String>>[
<String, String>{'url': 'stun:stun.l.google.com:19302'},
// turn server configuration example.
Expand Down
20 changes: 15 additions & 5 deletions lib/src/transports/websocket_dart_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ typedef OnCloseCallback = void Function(int? code, String? reason);
typedef OnOpenCallback = void Function();

class WebSocketImpl {
WebSocketImpl(this._url);
WebSocketImpl(this._url, this.messageDelay);

final String _url;
WebSocket? _socket;
OnOpenCallback? onOpen;
OnMessageCallback? onMessage;
OnCloseCallback? onClose;

final int messageDelay;
void connect(
{Iterable<String>? protocols,
required WebSocketSettings webSocketSettings}) async {
handleQueue();
logger.i('connect $_url, ${webSocketSettings.extraHeaders}, $protocols');
try {
if (webSocketSettings.allowBadCertificate) {
Expand All @@ -42,11 +43,20 @@ class WebSocketImpl {
onClose?.call(500, e.toString());
}
}
final StreamController<dynamic> queue = StreamController<dynamic>.broadcast();
void handleQueue() async {
queue.stream.asyncMap((dynamic event) async {
await Future<void>.delayed(Duration(milliseconds: messageDelay));
return event;
}).listen((dynamic event) async {
_socket!.add(event);
logger.d('send: \n\n$event');
});
}

void send(dynamic data) {
void send(dynamic data) async {
if (_socket != null) {
_socket!.add(data);
logger.d('send: \n\n$data');
queue.add(data);
}
}

Expand Down
8 changes: 6 additions & 2 deletions lib/src/transports/websocket_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import 'websocket_dart_impl.dart'
if (dart.library.js) 'websocket_web_impl.dart';

class WebSocketInterface implements Socket {
WebSocketInterface(String url, [WebSocketSettings? webSocketSettings]) {
final int _messageDelay;

WebSocketInterface(String url,
{required int messageDelay, WebSocketSettings? webSocketSettings})
: _messageDelay = messageDelay {
logger.d('new() [url:$url]');
_url = url;
dynamic parsed_url = Grammar.parse(url, 'absoluteURI');
Expand Down Expand Up @@ -83,7 +87,7 @@ class WebSocketInterface implements Socket {
}
logger.d('connecting to WebSocket $_url');
try {
_ws = WebSocketImpl(_url!);
_ws = WebSocketImpl(_url!, _messageDelay);

_ws!.onOpen = () {
_closed = false;
Expand Down

0 comments on commit 1efc86d

Please sign in to comment.