Skip to content

Commit

Permalink
add push
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Aug 7, 2024
1 parent 8494ffe commit 2b5dd09
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
34 changes: 19 additions & 15 deletions realtime/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple

from realtime.message import ChannelEvents
from realtime.push import Push
from realtime.types import Callback

from .presence import RealtimePresence
Expand Down Expand Up @@ -230,23 +232,25 @@ def rejoin(self) -> None:
):
self.channel_params["filter"] = self.filter

self._push("phx_join", {"config": self.channel_params})

def _push(self, event: str, payload: dict) -> None:
message = {
"topic": self.topic,
"event": event,
"payload": payload,
"ref": None,
}
access_token_payload = {}

try:
asyncio.get_event_loop().run_until_complete(
self.socket.ws_connection.send(json.dumps(message))
if self.socket._access_token is not None:
access_token_payload["access_token"] = self.socket._access_token

self._push(
ChannelEvents.join,
{"config": self.channel_params, "access_token": access_token_payload},
)

def _push(self, event: str, payload: Dict[str, Any]) -> Push:
if not self.joined:
raise Exception(
f"tried to push '{event}' to '{self.topic}' before joining. Use channel.subscribe() before pushing events"
)
except Exception as e:
print(e)
return

push = Push(self, event, payload)
push.send()
return push

# @Deprecated:
# You should use `subscribe` instead of this low-level method. It will be removed in the future.
Expand Down
7 changes: 6 additions & 1 deletion realtime/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
self.hb_interval = hb_interval
self.ws_connection: websockets.client.WebSocketClientProtocol
self.kept_alive = False
self.ref = 0
self.auto_reconnect = auto_reconnect

self.channels: DefaultDict[str, List[Channel]] = defaultdict(list)
Expand Down Expand Up @@ -207,4 +208,8 @@ def set_auth(self, token: Union[str, None]) -> None:
for _, channels in self.channels.items():
for channel in channels:
if channel.joined:
channel._push("access_token", {"access_token": token})
channel._push(ChannelEvents.access_token, {"access_token": token})

def _make_ref(self) -> str:
self.ref += 1
return f"{self.ref}"
2 changes: 1 addition & 1 deletion realtime/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ChannelEvents(str, Enum):
reply = "phx_reply"
leave = "phx_leave"
heartbeat = "heartbeat"
auth = "phx_auth"
access_token = "access_token"


PHOENIX_CHANNEL = "phoenix"
Expand Down
36 changes: 36 additions & 0 deletions realtime/push.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import json
from typing import Any, Dict
from realtime.channel import Channel
import logging

class Push:
def __init__(self, channel: Channel, event: str, payload: Dict[str, Any] = {}):
self.channel = channel
self.event = event
self.payload = payload
self.ref = ""


def send(self):
asyncio.get_event_loop().run_until_complete(self._send())

async def _send(self):
self.ref = self.channel.socket._make_ref()

message = {
"topic": self.channel.topic,
"event": self.event,
"payload": self.payload,
"ref": self.ref,
}

try:
await self.socket.ws_connection.send(json.dumps(message))
except Exception as e:
logging.error(f"send push failed: {e}")



def update_payload(self, payload: Dict[str, Any]):
self.payload = { **self.payload, **payload }
File renamed without changes.

0 comments on commit 2b5dd09

Please sign in to comment.