Skip to content

Commit

Permalink
rpc, xeth: finish cleaning up xeth
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Apr 28, 2015
1 parent 2b9fd6b commit 978ffd3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
13 changes: 8 additions & 5 deletions rpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err

res, _ := api.xeth().DbGet([]byte(args.Database + args.Key))
*reply = newHexData(res)

case "shh_version":
// Retrieves the currently running whisper protocol version
*reply = api.xeth().WhisperVersion()

case "shh_post":
// Injects a new message into the whisper network
args := new(WhisperMessageArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
Expand All @@ -421,18 +424,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = true

case "shh_newIdentity":
// Creates a new whisper identity to use for sending/receiving messages
*reply = api.xeth().Whisper().NewIdentity()

case "shh_hasIdentity":
// Checks if an identity if owned or not
args := new(WhisperIdentityArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
}
*reply = api.xeth().Whisper().HasIdentity(args.Identity)

case "shh_newGroup", "shh_addToGroup":
return NewNotImplementedError(req.Method)

case "shh_newFilter":
// Create a new filter to watch and match messages with
args := new(WhisperFilterArgs)
Expand All @@ -443,6 +445,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexNum(big.NewInt(int64(id)).Bytes())

case "shh_uninstallFilter":
// Remove an existing filter watching messages
args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
Expand All @@ -455,15 +458,15 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
}
*reply = api.xeth().MessagesChanged(args.Id)
*reply = api.xeth().WhisperMessagesChanged(args.Id)

case "shh_getMessages":
// Retrieve all the cached messages matching a specific, existing filter
args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
}
*reply = api.xeth().Messages(args.Id)
*reply = api.xeth().WhisperMessages(args.Id)

// case "eth_register":
// // Placeholder for actual type
Expand Down
40 changes: 28 additions & 12 deletions xeth/xeth.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,44 +452,60 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin
return filter.Find()
}

// NewWhisperFilter creates and registers a new message filter to watch for
// inbound whisper messages. All parameters at this point are assumed to be
// HEX encoded.
func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int {
// Pre-define the id to be filled later
var id int
callback := func(msg WhisperMessage) {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()

// Callback to delegate core whisper messages to this xeth filter
callback := func(msg WhisperMessage) {
p.messagesMut.RLock() // Only read lock to the filter pool
defer p.messagesMut.RUnlock()
p.messages[id].insert(msg)
}
// Initialize the core whisper filter and wrap into xeth
id = p.Whisper().Watch(to, from, topics, callback)

p.messagesMut.Lock()
p.messages[id] = newWhisperFilter(id, p.Whisper())
p.messagesMut.Unlock()

return id
}

// UninstallWhisperFilter disables and removes an existing filter.
func (p *XEth) UninstallWhisperFilter(id int) bool {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()

if _, ok := p.messages[id]; ok {
delete(p.messages, id)
return true
}

return false
}

func (self *XEth) MessagesChanged(id int) []WhisperMessage {
self.messagesMut.Lock()
defer self.messagesMut.Unlock()
// WhisperMessages retrieves all the known messages that match a specific filter.
func (self *XEth) WhisperMessages(id int) []WhisperMessage {
self.messagesMut.RLock()
defer self.messagesMut.RUnlock()

if self.messages[id] != nil {
return self.messages[id].retrieve()
return self.messages[id].messages()
}
return nil
}

func (self *XEth) Messages(id int) []WhisperMessage {
self.messagesMut.Lock()
defer self.messagesMut.Unlock()
// WhisperMessagesChanged retrieves all the new messages matched by a filter
// since the last retrieval
func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage {
self.messagesMut.RLock()
defer self.messagesMut.RUnlock()

if self.messages[id] != nil {
return self.messages[id].messages()
return self.messages[id].retrieve()
}
return nil
}
Expand Down

0 comments on commit 978ffd3

Please sign in to comment.