Skip to content

Commit

Permalink
Merge branch 'master' into topic/rdesc-obj-str
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe authored Sep 18, 2021
2 parents d290b76 + 2fd0e36 commit d21ec86
Show file tree
Hide file tree
Showing 44 changed files with 1,422 additions and 313 deletions.
13 changes: 12 additions & 1 deletion bindings/go/src/ucx/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type UcpSendCallback = func(request *UcpRequest, status UcsStatus)

type UcpTagRecvCallback = func(request *UcpRequest, status UcsStatus, tagInfo *UcpTagRecvInfo)

// This callback routine is invoked on the server side to handle incoming
// connections from remote clients.
type UcpListenerConnectionHandler = func(connRequest *UcpConnectionRequest)

// Map from the callback id that is passed to C to the actual go callback.
var callback_map = make(map[uint64]UcpCallback)

Expand All @@ -44,9 +48,16 @@ func deregister(id uint64) (UcpCallback, bool) {
return val, ret
}

func getCallback(id uint64) (UcpCallback, bool) {
mu.Lock()
defer mu.Unlock()
val, ret := callback_map[id]
return val, ret
}

//export ucxgo_completeGoSendRequest
func ucxgo_completeGoSendRequest(request unsafe.Pointer, status C.ucs_status_t, callbackId unsafe.Pointer) {
if callback, found := deregister(*(*uint64)(callbackId)); found {
if callback, found := deregister(uint64(uintptr(callbackId))); found {
callback.(UcpSendCallback)(&UcpRequest{
request: request,
Status: UcsStatus(status),
Expand Down
21 changes: 21 additions & 0 deletions bindings/go/src/ucx/connection_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

package ucx

// #include <ucp/api/ucp.h>
import "C"

type UcpConnectionRequest struct {
connRequest C.ucp_conn_request_h
listener C.ucp_listener_h
}

func (c *UcpConnectionRequest) Reject() error {
if status := C.ucp_listener_reject(c.listener, c.connRequest); status != C.UCS_OK {
return NewUcxError(status)
}
return nil
}
42 changes: 37 additions & 5 deletions bindings/go/src/ucx/endpoint_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ package ucx
// #include <ucp/api/ucp.h>
// #include "goucx.h"
import "C"
import (
"net"
"runtime"
"unsafe"
)

// Tuning parameters for the UCP endpoint.
type UcpEpParams struct {
Expand All @@ -23,11 +28,13 @@ type UcpEpParams struct {
type UcpEpErrHandler func(ep *UcpEp, status UcsStatus)

//export ucxgo_completeGoErrorHandler
func ucxgo_completeGoErrorHandler(ep C.ucp_ep_h, status C.ucs_status_t) {
errHandleGoCallback := errorHandles[ep]
errHandleGoCallback(&UcpEp{
ep: ep,
}, UcsStatus(status))
func ucxgo_completeGoErrorHandler(user_data unsafe.Pointer, ep C.ucp_ep_h, status C.ucs_status_t) {
errHandleGoCallback, found := errorHandles[ep]
if found {
errHandleGoCallback(&UcpEp{
ep: ep,
}, UcsStatus(status))
}
}

// Destination address
Expand Down Expand Up @@ -62,3 +69,28 @@ func (p *UcpEpParams) SetName(name string) *UcpEpParams {
p.params.field_mask |= C.UCP_EP_PARAM_FIELD_NAME
return p
}

// Destination address in the form of a sockaddr; means
// that this type of the endpoint creation is possible only on client side
// in client-server connection establishment flow.
func (p *UcpEpParams) SetSocketAddress(a *net.TCPAddr) (*UcpEpParams, error) {
sockAddr, error := toSockAddr(a)
if error != nil {
return nil, error
}

p.params.sockaddr = *sockAddr
runtime.SetFinalizer(p, func(f *UcpEpParams) { FreeNativeMemory(unsafe.Pointer(f.params.sockaddr.addr)) })
p.params.flags |= C.UCP_EP_PARAMS_FLAGS_CLIENT_SERVER
p.params.field_mask |= C.UCP_EP_PARAM_FIELD_SOCK_ADDR | C.UCP_EP_PARAM_FIELD_FLAGS
return p, nil
}

// Connection request from client; means that this type of the endpoint
// creation is possible only on server side in client-server connection
// establishment flow.
func (p *UcpEpParams) SetConnRequest(c *UcpConnectionRequest) *UcpEpParams {
p.params.conn_request = c.connRequest
p.params.field_mask |= C.UCP_EP_PARAM_FIELD_CONN_REQUEST
return p
}
4 changes: 3 additions & 1 deletion bindings/go/src/ucx/goucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ extern void ucxgo_completeGoSendRequest(void *request, ucs_status_t status, voi

extern void ucxgo_completeGoTagRecvRequest(void *request, ucs_status_t status, ucp_tag_recv_info_t *info, void *calback_id);

extern void ucxgo_completeGoErrorHandler(ucp_ep_h ep, ucs_status_t status);
extern void ucxgo_completeGoErrorHandler(void* arg, ucp_ep_h ep, ucs_status_t status);

extern void ucxgo_completeConnHandler(ucp_conn_request_h conn_request, void *calback_id);
50 changes: 50 additions & 0 deletions bindings/go/src/ucx/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2021, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

package ucx

// #include <ucp/api/ucp.h>
import "C"
import "net"

type UcpListener struct {
listener C.ucp_listener_h
connHandlerId uint64
}

// Needed to call connHandler.Reject() rather than listener.Reject(connHandler)
var connHandles2Listener = make(map[uint64]C.ucp_listener_h)

type UcpListenerAttributes struct {
Address *net.TCPAddr
}

func (l *UcpListener) Close() {
C.ucp_listener_destroy(l.listener)
deregister(l.connHandlerId)
delete(connHandles2Listener, l.connHandlerId)
}

func (l *UcpListener) Query(attrs ...UcpListenerAttribute) (*UcpListenerAttributes, error) {
var listenerAttr C.ucp_listener_attr_t

for _, attr := range attrs {
listenerAttr.field_mask |= C.ulong(attr)
}

if status := C.ucp_listener_query(l.listener, &listenerAttr); status != C.UCS_OK {
return nil, NewUcxError(status)
}

result := &UcpListenerAttributes{}

for _, attr := range attrs {
switch attr {
case UCP_LISTENER_ATTR_FIELD_SOCKADDR:
result.Address = toTcpAddr(&listenerAttr.sockaddr)
}
}
return result, nil
}
59 changes: 59 additions & 0 deletions bindings/go/src/ucx/listener_params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2021, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

package ucx

// #include <ucp/api/ucp.h>
// #include "goucx.h"
import "C"
import (
"net"
"runtime"
"unsafe"
)

// Tuning parameters for the UCP listener.
type UcpListenerParams struct {
params C.ucp_listener_params_t
connHandlerId uint64
}

//export ucxgo_completeConnHandler
func ucxgo_completeConnHandler(connRequest C.ucp_conn_request_h, cbId unsafe.Pointer) {
id := uint64(uintptr((cbId)))
if callback, found := getCallback(id); found {
listener := connHandles2Listener[id]
callback.(UcpListenerConnectionHandler)(&UcpConnectionRequest{
connRequest: connRequest,
listener: listener,
})
}
}

// Destination address
func (p *UcpListenerParams) SetSocketAddress(a *net.TCPAddr) (*UcpListenerParams, error) {
sockAddr, error := toSockAddr(a)
if error != nil {
return nil, error
}

p.params.field_mask |= C.UCP_LISTENER_PARAM_FIELD_SOCK_ADDR
p.params.sockaddr = *sockAddr
runtime.SetFinalizer(p, func(f *UcpListenerParams) { FreeNativeMemory(unsafe.Pointer(f.params.sockaddr.addr)) })
return p, nil
}

// Handler of an incoming connection request in a client-server connection flow.
func (p *UcpListenerParams) SetConnectionHandler(connHandler UcpListenerConnectionHandler) *UcpListenerParams {
var ucpConnHndl C.ucp_listener_conn_handler_t
cbId := register(connHandler)

p.connHandlerId = cbId
ucpConnHndl.arg = unsafe.Pointer(uintptr(cbId))
ucpConnHndl.cb = (C.ucp_listener_conn_callback_t)(C.ucxgo_completeConnHandler)
p.params.field_mask |= C.UCP_LISTENER_PARAM_FIELD_CONN_HANDLER
p.params.conn_handler = ucpConnHndl
return p
}
6 changes: 6 additions & 0 deletions bindings/go/src/ucx/ucp_contsants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ const (
UCP_WORKER_ATTR_FIELD_NAME UcpWorkerAttribute = C.UCP_WORKER_ATTR_FIELD_NAME
UCP_WORKER_ATTR_FIELD_MAX_INFO_STRING UcpWorkerAttribute = C.UCP_WORKER_ATTR_FIELD_MAX_INFO_STRING
)

type UcpListenerAttribute uint32

const (
UCP_LISTENER_ATTR_FIELD_SOCKADDR UcpListenerAttribute = C.UCP_LISTENER_ATTR_FIELD_SOCKADDR
)
96 changes: 95 additions & 1 deletion bindings/go/src/ucx/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@

package ucx

// #include <arpa/inet.h>
// #include <netinet/in.h>
// #include <stdlib.h>
// #include <string.h>
// #include <sys/socket.h>
// #include <ucp/api/ucp.h>
import "C"
import "unsafe"
import (
"net"
"syscall"
"unsafe"
)

func AllocateNativeMemory(size uint64) unsafe.Pointer {
return C.malloc(C.ulong(size))
Expand All @@ -24,3 +33,88 @@ func CBytes(data string) unsafe.Pointer {
func GoBytes(p unsafe.Pointer, n uint64) []byte {
return C.GoBytes(p, C.int(n))
}

func family(a *net.TCPAddr) int {
if a == nil || len(a.IP) <= net.IPv4len {
return syscall.AF_INET
}
if a.IP.To4() != nil {
return syscall.AF_INET
}
return syscall.AF_INET6
}

// Convert GO TCP address to C
func toSockAddr(a *net.TCPAddr) (*C.ucs_sock_addr_t, error) {
// We can't assing to ucs_sock_addr_t->addr reference to Go's memory,
// so need to allocate
sockaddrPtr := AllocateNativeMemory(C.sizeof_struct_sockaddr_storage)
sa := (*C.struct_sockaddr_storage)(sockaddrPtr)
var result C.ucs_sock_addr_t
if a == nil {
a = &net.TCPAddr{}
}
switch family(a) {
case syscall.AF_INET:
if len(a.IP) == 0 {
a.IP = net.IPv4zero
}
ip4 := a.IP.To4()
if ip4 == nil {
return nil, &net.AddrError{Err: "non-IPv4 address", Addr: a.IP.String()}
}
sa.ss_family = C.AF_INET
var sin *C.struct_sockaddr_in = (*C.struct_sockaddr_in)(unsafe.Pointer(sa))
sin.sin_port = C.htons(C.ushort(a.Port))
charBytes := C.CString(ip4.String())
C.inet_pton(C.AF_INET, charBytes, unsafe.Pointer(&sin.sin_addr.s_addr))
FreeNativeMemory(unsafe.Pointer(charBytes))
case syscall.AF_INET6:
// In general, an IP wildcard address, which is either
// "0.0.0.0" or "::", means the entire IP addressing
// space. For some historical reason, it is used to
// specify "any available address" on some operations
// of IP node.
//
// When the IP node supports IPv4-mapped IPv6 address,
// we allow a listener to listen to the wildcard
// address of both IP addressing spaces by specifying
// IPv6 wildcard address.
if len(a.IP) == 0 || a.IP.Equal(net.IPv4zero) {
a.IP = net.IPv6zero
}
// We accept any IPv6 address including IPv4-mapped
// IPv6 address.
ip6 := a.IP.To16()
if ip6 == nil {
return nil, &net.AddrError{Err: "non-IPv6 address", Addr: a.IP.String()}
}
sa.ss_family = C.AF_INET6
var sin6 *C.struct_sockaddr_in6 = (*C.struct_sockaddr_in6)(unsafe.Pointer(sa))
sin6.sin6_port = C.htons(C.ushort(a.Port))
charBytes := C.CString(ip6.String())
C.inet_pton(C.AF_INET6, charBytes, unsafe.Pointer(&sin6.sin6_addr))
FreeNativeMemory(unsafe.Pointer(charBytes))
default:
return nil, &net.AddrError{Err: "invalid address family", Addr: a.IP.String()}
}
result.addrlen = C.socklen_t(len(a.IP))
result.addr = (*C.struct_sockaddr)(unsafe.Pointer(sa))
return &result, nil
}

// Convert sockaddr_storage to go TCPAddr
func toTcpAddr(sockaddr *C.struct_sockaddr_storage) *net.TCPAddr {
result := &net.TCPAddr{}
if sockaddr.ss_family == C.AF_INET6 {
var sin6 *C.struct_sockaddr_in6 = (*C.struct_sockaddr_in6)(unsafe.Pointer(sockaddr))
result.Port = int(C.ntohs(sin6.sin6_port))
C.memcpy(unsafe.Pointer(&result.IP), unsafe.Pointer(&sin6.sin6_addr), 16)
} else {
var sin *C.struct_sockaddr_in = (*C.struct_sockaddr_in)(unsafe.Pointer(sockaddr))
result.Port = int(C.ntohs(sin.sin_port))
C.memcpy(unsafe.Pointer(&result.IP), unsafe.Pointer(&sin.sin_addr), 4)
}

return result
}
13 changes: 13 additions & 0 deletions bindings/go/src/ucx/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,16 @@ func (w *UcpWorker) RecvTagNonBlocking(address unsafe.Pointer, size uint64,
Length: uint64(recvInfo.length),
})
}

// This routine creates new UcpListener.
func (w *UcpWorker) NewListener(listenerParams *UcpListenerParams) (*UcpListener, error) {
var listener C.ucp_listener_h

if status := C.ucp_listener_create(w.worker, &listenerParams.params, &listener); status != C.UCS_OK {
return nil, NewUcxError(status)
}

connHandles2Listener[listenerParams.connHandlerId] = listener

return &UcpListener{listener, listenerParams.connHandlerId}, nil
}
Loading

0 comments on commit d21ec86

Please sign in to comment.