Skip to content

Commit

Permalink
refactor: move qstat and utilization streamer kubernetes logic into b…
Browse files Browse the repository at this point in the history
…ackend

This also has a first pass at consolidating "which namespace" logic into the backend state, rather than having it float around the UI randomly.

TODO: pkg/observe/logs still has kubernetes code that needs to be refactored into the backend

Signed-off-by: Nick Mitchell <nickm@us.ibm.com>
  • Loading branch information
starpit committed Aug 28, 2024
1 parent 5729202 commit a019bd5
Show file tree
Hide file tree
Showing 44 changed files with 275 additions and 219 deletions.
2 changes: 1 addition & 1 deletion cmd/subcommands/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newCompileCmd() *cobra.Command {
}

cmd.Flags().StringVarP(&branchFlag, "branch", "b", branchFlag, "Git branch to pull from")
compilationOptions := addCompilationOptions(cmd)
compilationOptions := addCompilationOptions(cmd, false)
cmd.Flags().BoolVarP(&allFlag, "all-platforms", "A", allFlag, "Generate binaries for all supported platform/arch combinations")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", verboseFlag, "Verbose output")

Expand Down
6 changes: 2 additions & 4 deletions cmd/subcommands/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func Newcmd() *cobra.Command {
var namespaceFlag string
var verboseFlag bool
var intervalSecondsFlag int

Expand All @@ -21,7 +20,6 @@ func Newcmd() *cobra.Command {
Long: "Displays CPU utilization",
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Verbose output")
cmd.Flags().IntVarP(&intervalSecondsFlag, "interval", "i", 2, "Sampling interval")
tgtOpts := addTargetOptions(cmd)
Expand All @@ -32,12 +30,12 @@ func Newcmd() *cobra.Command {
maybeRun = args[0]
}

backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{}) // TODO compilation.Options
if err != nil {
return err
}

return cpu.UI(maybeRun, backend, cpu.CpuOptions{Namespace: namespaceFlag, Verbose: verboseFlag, IntervalSeconds: intervalSecondsFlag})
return cpu.UI(maybeRun, backend, cpu.CpuOptions{Namespace: tgtOpts.Namespace, Verbose: verboseFlag, IntervalSeconds: intervalSecondsFlag})
}

return cmd
Expand Down
6 changes: 2 additions & 4 deletions cmd/subcommands/down.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func newDownCmd() *cobra.Command {
var namespaceFlag string
var verboseFlag bool
var deleteNamespaceFlag bool
var deleteAllRunsFlag bool
Expand All @@ -24,7 +23,6 @@ func newDownCmd() *cobra.Command {
Long: "Undeploy the application",
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Verbose output")
cmd.Flags().BoolVarP(&deleteNamespaceFlag, "delete-namespace", "N", false, "Also delete namespace (only for empty namespaces)")
cmd.Flags().BoolVarP(&deleteAllRunsFlag, "all", "A", false, "Delete all runs in the given namespace")
Expand All @@ -34,13 +32,13 @@ func newDownCmd() *cobra.Command {
tgtOpts := addTargetOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{ApiKey: apiKey}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{ApiKey: apiKey}) // TODO compilation.Options
if err != nil {
return err
}

return boot.DownList(args, backend, boot.DownOptions{
Namespace: namespaceFlag, Verbose: verboseFlag, DeleteNamespace: deleteNamespaceFlag,
Namespace: tgtOpts.Namespace, Verbose: verboseFlag, DeleteNamespace: deleteNamespaceFlag,
DeleteAll: deleteAllRunsFlag,
ApiKey: apiKey, DeleteCloudResources: deleteCloudResourcesFlag})
}
Expand Down
6 changes: 2 additions & 4 deletions cmd/subcommands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
)

func newLogsCommand() *cobra.Command {
var namespaceFlag string
var componentsFlag []string
var followFlag bool
var verboseFlag bool
Expand All @@ -25,7 +24,6 @@ func newLogsCommand() *cobra.Command {
Long: "Print or stream logs from the application",
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
cmd.Flags().StringSliceVarP(&componentsFlag, "component", "c", []string{"workers"}, "Components to track (workers|dispatcher|workstealer)")
cmd.Flags().BoolVarP(&followFlag, "follow", "f", false, "Stream the logs")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Verbose output")
Expand All @@ -37,7 +35,7 @@ func newLogsCommand() *cobra.Command {
maybeRun = args[0]
}

backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{}) // TODO compilation.Options
if err != nil {
return err
}
Expand All @@ -60,7 +58,7 @@ func newLogsCommand() *cobra.Command {
}
}

return observe.Logs(maybeRun, backend, observe.LogsOptions{Namespace: namespaceFlag, Follow: followFlag, Verbose: verboseFlag, Components: comps})
return observe.Logs(maybeRun, backend, observe.LogsOptions{Namespace: tgtOpts.Namespace, Follow: followFlag, Verbose: verboseFlag, Components: comps})
}

return cmd
Expand Down
7 changes: 2 additions & 5 deletions cmd/subcommands/qlast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ import (
)

func newQlastCommand() *cobra.Command {
var namespaceFlag string

var cmd = &cobra.Command{
Use: "qlast",
Short: "Stream queue statistics to console",
Args: cobra.MatchAll(cobra.MinimumNArgs(1), cobra.OnlyValidArgs),
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
tgtOpts := addTargetOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
Expand All @@ -31,12 +28,12 @@ func newQlastCommand() *cobra.Command {
extra = args[1]
}

backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{}) // TODO compilation.Options
if err != nil {
return err
}

val, err := qstat.Qlast(marker, extra, backend, qstat.QlastOptions{Namespace: namespaceFlag})
val, err := qstat.Qlast(marker, extra, backend, qstat.QlastOptions{})
if err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions cmd/subcommands/qstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func newQstatCommand() *cobra.Command {
var namespaceFlag string
var tailFlag int64
var followFlag bool
var verboseFlag bool
Expand All @@ -21,7 +20,6 @@ func newQstatCommand() *cobra.Command {
Short: "Stream queue statistics to console",
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
cmd.Flags().BoolVarP(&followFlag, "follow", "f", false, "Track updates (rather than printing once)")
cmd.Flags().Int64VarP(&tailFlag, "tail", "T", -1, "Number of lines to tail")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Verbose output")
Expand All @@ -34,12 +32,12 @@ func newQstatCommand() *cobra.Command {
maybeRun = args[0]
}

backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{}) // TODO compilation.Options
if err != nil {
return err
}

return qstat.UI(maybeRun, backend, qstat.Options{Namespace: namespaceFlag, Follow: followFlag, Tail: tailFlag, Verbose: verboseFlag, Quiet: quietFlag})
return qstat.UI(maybeRun, backend, qstat.Options{Follow: followFlag, Tail: tailFlag, Verbose: verboseFlag, Quiet: quietFlag})
}

return cmd
Expand Down
6 changes: 2 additions & 4 deletions cmd/subcommands/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func newStatusCommand() *cobra.Command {
var namespaceFlag string
var watchFlag bool
var verboseFlag bool
var summaryFlag bool
Expand All @@ -23,7 +22,6 @@ func newStatusCommand() *cobra.Command {
Short: "Status of a run",
}

cmd.Flags().StringVarP(&namespaceFlag, "namespace", "n", "", "Kubernetes namespace that houses your instance")
cmd.Flags().BoolVarP(&watchFlag, "watch", "w", false, "Track updates to run status")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Stream more verbose updates to console")
cmd.Flags().BoolVarP(&summaryFlag, "summary", "s", false, "Show only summary information, do not break out queue stats")
Expand All @@ -42,12 +40,12 @@ func newStatusCommand() *cobra.Command {
maybeRun = args[0]
}

backend, err := be.New(tgtOpts.TargetPlatform, compilation.Options{}) // TODO compilation.Options
backend, err := be.New(tgtOpts, compilation.Options{}) // TODO compilation.Options
if err != nil {
return err
}

return status.UI(maybeRun, backend, status.Options{Namespace: namespaceFlag, Watch: watchFlag, Verbose: verboseFlag, Summary: summaryFlag, Nloglines: loglinesFlag, IntervalSeconds: intervalFlag})
return status.UI(maybeRun, backend, status.Options{Watch: watchFlag, Verbose: verboseFlag, Summary: summaryFlag, Nloglines: loglinesFlag, IntervalSeconds: intervalFlag})
}

return cmd
Expand Down
21 changes: 14 additions & 7 deletions cmd/subcommands/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package subcommands
import (
"github.com/spf13/cobra"

"lunchpail.io/pkg/be"
"lunchpail.io/pkg/be/platform"
"lunchpail.io/pkg/compilation"
)

type TargetOptions struct {
TargetPlatform platform.Platform
}
type TargetOptions = be.TargetOptions

func addTargetOptions(cmd *cobra.Command) TargetOptions {
options := TargetOptions{TargetPlatform: platform.Kubernetes}

if compilation.IsCompiled() {
// by default, we use Namespace == app name
options.Namespace = compilation.Name()
}

func addTargetOptions(cmd *cobra.Command) *TargetOptions {
var options TargetOptions
options.TargetPlatform = platform.Kubernetes
cmd.Flags().VarP(&options.TargetPlatform, "target", "t", "Deployment target [kubernetes, ibmcloud, skypilot]")
return &options
cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", options.Namespace, "Kubernetes namespace to deploy to")

return options
}
11 changes: 7 additions & 4 deletions cmd/subcommands/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"github.com/spf13/cobra"
)

func addCompilationOptions(cmd *cobra.Command) *compilation.Options {
func addCompilationOptions(cmd *cobra.Command, skipNamespaceFlag bool) *compilation.Options {
var options compilation.Options

cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "", "Kubernetes namespace to deploy to")
if !skipNamespaceFlag {
cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "", "Kubernetes namespace to deploy to")
}

cmd.Flags().StringVarP(&options.ImagePullSecret, "image-pull-secret", "s", "", "Of the form <user>:<token>@ghcr.io")
cmd.Flags().StringVarP(&options.Queue, "queue", "", "", "Use the queue defined by this Secret (data: accessKeyID, secretAccessKey, endpoint)")
cmd.Flags().BoolVarP(&options.HasGpuSupport, "gpu", "", false, "Run with GPUs (if supported by the application)")
Expand Down Expand Up @@ -57,7 +60,7 @@ func newUpCmd() *cobra.Command {
}

cmd.Flags().SortFlags = false
appOpts := addCompilationOptions(cmd)
appOpts := addCompilationOptions(cmd, true)
tgtOpts := addTargetOptions(cmd)
cmd.Flags().BoolVarP(&dryrunFlag, "dry-run", "", false, "Emit application yaml to stdout")
cmd.Flags().BoolVarP(&verboseFlag, "verbose", "v", false, "Verbose output")
Expand Down Expand Up @@ -86,7 +89,7 @@ func newUpCmd() *cobra.Command {
Zone: appOpts.Zone, Profile: appOpts.Profile, ImageID: appOpts.ImageID, CreateNamespace: appOpts.CreateNamespace}
configureOptions := linker.ConfigureOptions{CompilationOptions: compilationOptions, Verbose: verboseFlag}

backend, err := be.New(tgtOpts.TargetPlatform, compilationOptions)
backend, err := be.New(tgtOpts, compilationOptions)
if err != nil {
return err
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/be/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package be

import (
"lunchpail.io/pkg/be/events"
"lunchpail.io/pkg/be/events/qstat"
"lunchpail.io/pkg/be/events/utilization"
"lunchpail.io/pkg/be/platform"
"lunchpail.io/pkg/be/runs"
"lunchpail.io/pkg/ir"

"golang.org/x/sync/errgroup"
)

type Backend interface {
Expand All @@ -18,14 +22,20 @@ type Backend interface {
Down(linked ir.Linked, opts platform.CliOptions, verbose bool) error

// Delete namespace
DeleteNamespace(compilationName, namespace string) error
DeleteNamespace(compilationName string) error

// List deployed runs
ListRuns(appName, namespace string) ([]runs.Run, error)
ListRuns(appName string) ([]runs.Run, error)

//
StreamRunEvents(appname, runname, namespace string) (chan events.Message, error)
StreamRunEvents(appname, runname string) (chan events.Message, error)

//
StreamRunComponentUpdates(appname, runname, namespace string) (chan events.ComponentUpdate, chan events.Message, error)
StreamRunComponentUpdates(appname, runname string) (chan events.ComponentUpdate, chan events.Message, error)

// Stream cpu and memory statistics
StreamUtilization(runname string, intervalSeconds int) (chan utilization.Model, error)

// Stream queue statistics
StreamQueueStats(runname string, opts qstat.Options) (chan qstat.Model, *errgroup.Group, error)
}
8 changes: 4 additions & 4 deletions pkg/observe/qstat/model.go → pkg/be/events/qstat/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Model struct {
}

// Count of Live workers across all pools
func (model *Model) liveWorkers() int {
func (model *Model) LiveWorkers() int {
N := 0
for _, pool := range model.Pools {
N += len(pool.LiveWorkers)
Expand All @@ -35,7 +35,7 @@ func (model *Model) liveWorkers() int {
}

// Count of Dead workers across all pools
func (model *Model) deadWorkers() int {
func (model *Model) DeadWorkers() int {
N := 0
for _, pool := range model.Pools {
N += len(pool.DeadWorkers)
Expand All @@ -44,6 +44,6 @@ func (model *Model) deadWorkers() int {
}

// Count of Live or Dead workers across all pools
func (model *Model) workers() int {
return model.liveWorkers() + model.deadWorkers()
func (model *Model) Workers() int {
return model.LiveWorkers() + model.DeadWorkers()
}
8 changes: 8 additions & 0 deletions pkg/be/events/qstat/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package qstat

type Options struct {
Follow bool
Tail int64
Verbose bool
Quiet bool
}
32 changes: 32 additions & 0 deletions pkg/be/events/utilization/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package utilization

import (
"sort"

"lunchpail.io/pkg/lunchpail"
)

type Worker struct {
Name string
Component lunchpail.Component
CpuUtil float64
MemoryBytes uint64
}

type Model struct {
Workers []Worker
}

func (model *Model) HasData() bool {
return len(model.Workers) > 0
}

func (model *Model) Sorted() []Worker {
w := []Worker{}
for _, worker := range model.Workers {
w = append(w, worker)
}

sort.Slice(w, func(i, j int) bool { return w[i].CpuUtil > w[j].CpuUtil })
return w
}
2 changes: 1 addition & 1 deletion pkg/be/ibmcloud/down.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (backend Backend) Down(linked ir.Linked, opts platform.CliOptions, verbose
return nil
}

func (backend Backend) DeleteNamespace(compilationName, namespace string) error {
func (backend Backend) DeleteNamespace(compilationName string) error {
// TODO?
return nil
}
13 changes: 13 additions & 0 deletions pkg/be/ibmcloud/qstat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ibmcloud

import (
"fmt"

"golang.org/x/sync/errgroup"

"lunchpail.io/pkg/be/events/qstat"
)

func (backend Backend) StreamQueueStats(runname string, opts qstat.Options) (chan qstat.Model, *errgroup.Group, error) {
return nil, nil, fmt.Errorf("Unsupported operation: 'StreamQueueStats'")
}
Loading

0 comments on commit a019bd5

Please sign in to comment.