Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: In cluster dialer to proxy TCP connections to unexposed services #688

Merged
merged 4 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -871,6 +872,7 @@ github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxd
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/sys/mount v0.2.0 h1:WhCW5B355jtxndN5ovugJlMFJawbUODuW8fSnEH6SSM=
github.com/moby/sys/mount v0.2.0/go.mod h1:aAivFE2LB3W4bACsUXChRHQ0qKWsetY4Y9V7sxOougM=
Expand Down
324 changes: 324 additions & 0 deletions k8s/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
package k8s

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"time"

coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

const (
socatImage = "quay.io/mvasek/socat:alpine"
)

type ContextDialer interface {
DialContext(ctx context.Context, network string, addr string) (net.Conn, error)
Close() error
}

// NewInClusterDialer creates context dialer that will dial TCP connections via POD running in k8s cluster.
// This is useful when accessing k8s services that are not exposed outside cluster (e.g. openshift image registry).
//
// Usage:
//
// dialer, err := k8s.NewInClusterDialer(ctx)
// if err != nil {
// return err
// }
// defer dialer.Close()
//
// transport := &http.Transport{
// DialContext: dialer.DialContext,
// }
//
// var client = http.Client{
// Transport: transport,
// }
func NewInClusterDialer(ctx context.Context) (ContextDialer, error) {
c := &contextDialer{}
err := c.startDialerPod(ctx)
if err != nil {
return nil, err
}
return c, nil
}

type contextDialer struct {
coreV1 v1.CoreV1Interface
restConf *restclient.Config
podName string
namespace string
}

func (c *contextDialer) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
if !(network == "tcp" || network == "tcp4" || network == "tcp6") {
return nil, fmt.Errorf("unsupported network: %q", network)
}

execDone := make(chan struct{})
pr, pw, conn := newConn(execDone)

go func() {
defer close(execDone)
errOut := bytes.NewBuffer(nil)
err := c.exec(addr, pr, pw, errOut)
if err != nil {
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, errOut.String())
_ = pr.CloseWithError(err)
_ = pw.CloseWithError(err)
}
}()

return conn, nil
}

func (c *contextDialer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}

return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhuss @markusthoemmes is there a way to make pod to be automatically delete upon completion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this if you are using a higher-level abstraction like Deployment or ReplicaSet if you want them to manage the lifecycle of your pod, otherwise, you have to do it on your own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, what do you mean with 'upon' completion ? When all containers in the pod stop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when all processes exited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the app for some reason crashes the Close() may not be called so there would be dangling completed pods.

}

func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
cliConf := GetClientConfig()
c.restConf, err = cliConf.ClientConfig()
if err != nil {
return
}

err = setConfigDefaults(c.restConf)
if err != nil {
return
}

client, err := kubernetes.NewForConfig(c.restConf)
if err != nil {
return
}
c.coreV1 = client.CoreV1()

c.namespace, err = GetNamespace("")
if err != nil {
return
}

pods := client.CoreV1().Pods(c.namespace)

c.podName = "in-cluster-dialer-" + rand.String(5)

defer func() {
if err != nil {
c.Close()
}
}()

pod := &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Labels: nil,
Annotations: nil,
},
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: c.podName,
Image: socatImage,
Stdin: true,
Command: []string{"sleep", "infinity"},
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
}
creatOpts := metaV1.CreateOptions{}

ready := c.podReady(ctx)

_, err = pods.Create(ctx, pod, creatOpts)
if err != nil {
return
}

select {
case err = <-ready:
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 1):
err = errors.New("timeout")
}

if err != nil {
err = fmt.Errorf("failed to start dialer container: %w", err)
}

return err
}

func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Writer) error {

restClient := c.coreV1.RESTClient()
req := restClient.Post().
Resource("pods").
Name(c.podName).
Namespace(c.namespace).
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: c.podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(c.restConf, "POST", req.URL())
if err != nil {
return err
}

return executor.Stream(remotecommand.StreamOptions{
Stdin: in,
Stdout: out,
Stderr: errOut,
Tty: false,
})
}

func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
d := make(chan error)
errChan = d

pods := c.coreV1.Pods(c.namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", c.podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
}
watcher, err := pods.Watch(ctx, listOpts)
if err != nil {
return
}

go func() {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
pod := event.Object.(*coreV1.Pod)

if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
d <- nil
return
}
if status.State.Waiting != nil {
switch status.State.Waiting.Reason {
case "ErrImagePull",
"CreateContainerError",
"CreateContainerConfigError",
"InvalidImageName",
"CrashLoopBackOff",
"ImagePullBackOff":
d <- fmt.Errorf("reason: %v, message: %v",
status.State.Waiting.Reason,
status.State.Waiting.Message)
return
default:
continue
}
}
}
}
}
}()

return
}

func setConfigDefaults(config *restclient.Config) error {
gv := coreV1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/api"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}

return nil
}

type addr struct{}

func (a addr) Network() string {
return "pod-stdio"
}

func (a addr) String() string {
return "pod-stdio"
}

type conn struct {
pr *io.PipeReader
pw *io.PipeWriter
execDone <-chan struct{}
}

func (c conn) Read(b []byte) (n int, err error) {
return c.pr.Read(b)
}

func (c conn) Write(b []byte) (n int, err error) {
return c.pw.Write(b)
}

func (c conn) Close() error {
err := c.pw.Close()
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
<-c.execDone
err = c.pr.Close()
if err != nil {
return fmt.Errorf("failed to close reader: %w", err)
}
return nil
}

func (c conn) LocalAddr() net.Addr {
return addr{}
}

func (c conn) RemoteAddr() net.Addr {
return addr{}
}

func (c conn) SetDeadline(t time.Time) error { return nil }

func (c conn) SetReadDeadline(t time.Time) error { return nil }

func (c conn) SetWriteDeadline(t time.Time) error { return nil }

func newConn(execDone <-chan struct{}) (*io.PipeReader, *io.PipeWriter, conn) {
pr0, pw0 := io.Pipe()
pr1, pw1 := io.Pipe()
rwc := conn{pr: pr0, pw: pw1, execDone: execDone}
return pr1, pw0, rwc
}
Loading