Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lunchpail enqueue commands #122

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
/images
/builds
/tests
.git
*.log
*~
18 changes: 2 additions & 16 deletions charts/shell/prestop.sh
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
#!/usr/bin/env bash
#!/bin/sh

echo "DEBUG prestop starting"

config=/tmp/rclone-prestop.conf
donefile=s3:/$LUNCHPAIL_QUEUE_PATH/done

cat <<EOF > $config
[s3]
type = s3
provider = Other
env_auth = false
endpoint = $lunchpail_queue_endpoint
access_key_id = $lunchpail_queue_accessKeyID
secret_access_key = $lunchpail_queue_secretAccessKey
acl = public-read
EOF

echo "DEBUG prestop touching donefile"
rclone --config $config touch $donefile
lunchpail qdone
echo "DEBUG prestop touching donefile: done"

echo "INFO Done with my part of the job"
22 changes: 22 additions & 0 deletions cmd/subcommands/enqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package subcommands

import (
"github.com/spf13/cobra"

"lunchpail.io/cmd/subcommands/enqueue"
)

func newEnqueueCmd() *cobra.Command {
return &cobra.Command{
Use: "enqueue",
Short: "Commands that help with enqueueing work tasks",
Long: "Commands that help with enqueueing work tasks",
}
}

func init() {
enqueueCmd := newEnqueueCmd()
rootCmd.AddCommand(enqueueCmd)
enqueueCmd.AddCommand(enqueue.NewEnqueueFileCmd())
enqueueCmd.AddCommand(enqueue.NewEnqueueFromS3Cmd())
}
31 changes: 31 additions & 0 deletions cmd/subcommands/enqueue/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package enqueue

import (
"fmt"

"github.com/spf13/cobra"

"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/workstealer"
)

func NewEnqueueFileCmd() *cobra.Command {
var cmd = &cobra.Command{
Use: "file <file>",
Short: "Enqueue a single file as a work task",
Long: "Enqueue a single file as a work task",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if compilation.IsCompiled() {
// TODO: pull out command line and other
// embeddings from this compiled executable
return fmt.Errorf("TODO")
}

return workstealer.EnqueueFile(args[0])
}

return cmd
}
39 changes: 39 additions & 0 deletions cmd/subcommands/enqueue/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package enqueue

import (
"fmt"
"os"

"github.com/spf13/cobra"

"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/workstealer"
)

func NewEnqueueFromS3Cmd() *cobra.Command {
var cmd = &cobra.Command{
Use: "s3 <path> <envVarPrefix>",
Short: "Enqueue a files in a given S3 path",
Long: "Enqueue a files in a given S3 path",
Args: cobra.MatchAll(cobra.ExactArgs(2), cobra.OnlyValidArgs),
}

var repeat int
cmd.Flags().IntVarP(&repeat, "repeat", "r", 1, "Upload N copies of the task")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if compilation.IsCompiled() {
// TODO: pull out command line and other
// embeddings from this compiled executable
return fmt.Errorf("TODO")
}

envvarPrefix := args[1]
endpoint := os.Getenv(envvarPrefix + "endpoint")
accessKeyID := os.Getenv(envvarPrefix + "accessKeyID")
secretAccessKey := os.Getenv(envvarPrefix + "secretAccessKey")
return workstealer.EnqueueFromS3(args[0], endpoint, accessKeyID, secretAccessKey, repeat)
}

return cmd
}
35 changes: 35 additions & 0 deletions cmd/subcommands/qdone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package subcommands

import (
"fmt"

"github.com/spf13/cobra"

"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/workstealer"
)

func newQdoneCmd() *cobra.Command {
var cmd = &cobra.Command{
Use: "qdone",
Short: "Indicate that dispatching is done",
Long: "Indicate that dispatching is done",
Args: cobra.MatchAll(cobra.OnlyValidArgs),
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if compilation.IsCompiled() {
// TODO: pull out command line and other
// embeddings from this compiled executable
return fmt.Errorf("TODO")
}

return workstealer.Qdone()
}

return cmd
}

func init() {
rootCmd.AddCommand(newQdoneCmd())
}
10 changes: 7 additions & 3 deletions cmd/subcommands/qls.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

func newQlsCmd() *cobra.Command {
var cmd = &cobra.Command{
Use: "qls <path>",
Use: "qls [path]",
Short: "List queue path",
Long: "List queue path",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
Args: cobra.MatchAll(cobra.MaximumNArgs(1), cobra.OnlyValidArgs),
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
Expand All @@ -24,7 +24,11 @@ func newQlsCmd() *cobra.Command {
return fmt.Errorf("TODO")
}

return workstealer.Qls(args[0])
path := ""
if len(args) == 1 {
path = args[0]
}
return workstealer.Qls(path)
}

return cmd
Expand Down
107 changes: 12 additions & 95 deletions pkg/fe/transformer/api/dispatch/parametersweep/main.sh
Original file line number Diff line number Diff line change
@@ -1,50 +1,33 @@
#!/usr/bin/env bash
#!/bin/sh

set -e
set -o pipefail
#set -o pipefail

echo -n "Started TaskDispatcher method=$__LUNCHPAIL_METHOD "
if [[ $__LUNCHPAIL_METHOD = tasksimulator ]]
then echo "injectedTasksPerInterval=$TASKS intervalSeconds=$__LUNCHPAIL_INTERVAL"
elif [[ $__LUNCHPAIL_METHOD = parametersweep ]]
then echo "min=$__LUNCHPAIL_SWEEP_MIN max=$__LUNCHPAIL_SWEEP_MAX step=$__LUNCHPAIL_SWEEP_STEP"
fi
echo -n "Started TaskDispatcher "
echo "min=$__LUNCHPAIL_SWEEP_MIN max=$__LUNCHPAIL_SWEEP_MAX step=$__LUNCHPAIL_SWEEP_STEP"

printenv

# test injected values from -f values.yaml
# taskprefix2 can be used to test that e.g. numerical values are processed correctly
if [[ -n "$taskprefix" ]]
if [ -n "$taskprefix" ]
then taskprefix=${taskprefix}${taskprefix2}
else taskprefix=task
fi
echo "got value taskprefix=$taskprefix"

if [[ -n "$DEBUG" ]]
if [ -n "$DEBUG" ]
then printenv
fi

config=/tmp/rclone.conf
remote=s3:/${LUNCHPAIL_QUEUE_PATH}/inbox
cat <<EOF > $config
[s3]
type = s3
provider = Other
env_auth = false
endpoint = $lunchpail_queue_endpoint
access_key_id = $lunchpail_queue_accessKeyID
secret_access_key = $lunchpail_queue_secretAccessKey
acl = public-read
EOF

# how many tasks we've injected so far; it is useful to keep the
# filename of tasks consistent, so that tests can look for a
# deterministic set of tasks
idx=0

if [[ $__LUNCHPAIL_METHOD = parametersweep ]]
then
for parameter_value in $(seq $__LUNCHPAIL_SWEEP_MIN $__LUNCHPAIL_SWEEP_STEP $__LUNCHPAIL_SWEEP_MAX)
do
if [[ $parameter_value != $__LUNCHPAIL_SWEEP_MIN ]]
for parameter_value in $(seq $__LUNCHPAIL_SWEEP_MIN $__LUNCHPAIL_SWEEP_STEP $__LUNCHPAIL_SWEEP_MAX)
do
if [ "$parameter_value" != "$__LUNCHPAIL_SWEEP_MIN" ]
then sleep ${__LUNCHPAIL_INTERVAL-5}
fi

Expand All @@ -54,72 +37,6 @@ then
echo "Injecting task=$task parameter_value=${parameter_value}"
echo -n ${parameter_value} > $task

rclone --config $config sync $PROGRESS $task $remote
rm -f "$task"
done

exit
fi

# otherwise tasksimulator
if [[ -n "$COLUMNS" ]] && [[ -n "$COLUMN_TYPES" ]]
then echo "Using schema columns=\"$COLUMNS\" columnTypes=\"$COLUMN_TYPES\""
fi

for i in $(seq 1 $TASKS)
do
if [[ $i > 1 ]]
then sleep ${__LUNCHPAIL_INTERVAL-5}
fi

task=/tmp/${taskprefix}-$(cat /proc/sys/kernel/random/uuid).txt
echo "Injecting task=$task format=${FORMAT-generic}"

if [[ $FORMAT = parquet ]] && [[ -n "$COLUMNS" ]] && [[ -n "$COLUMN_TYPES" ]]
then
# Simulated parquet task
echo "Simulating a parquet task"
echo "$COLUMNS" | tr " " "," > $task # csv column header

# for each row
for j in $(seq 1 ${NROWS_PER_TASK-10})
do
# for each column
IDX=0
for type in $COLUMN_TYPES
do
case $type in
number)
VAL=$RANDOM
;;
string)
VAL="Lorem ipsum dolor sit amet consectetur adipiscing elit. Vestibulum pharetra eros lectus. Nulla bibendum ligula sapien non pellentesque urna vestibulum eu. Duis ut eleifend sem. Nam eget diam euismod lacinia massa quis vestibulum nulla. Aliquam porttitor egestas interdum. Morbi eu porttitor velit. Pellentesque habitant morbi tristique senectus et netus et."
;;
*)
VAL="null"
;;
esac

if [[ $IDX != 0 ]]; then echo -n "," >> $task; fi
echo -n "$VAL" >> $task

IDX=$((IDX + 1))
done
echo "" >> $task # end the line
done
#parquet convert-csv $task -o ${task}.parquet
python -c "import pandas as pd; pd.read_csv('$task').to_parquet('$task.parquet')"
# python -c 'import pandas as pd; pd.read_csv("/tmp/foo.csv").to_parquet("/tmp/foo.parquet")'
otask="$task"
task="${task}.parquet"
rm -f "$otask"
else
echo "Simulated generic task" > $task
fi

rclone --config $config sync $PROGRESS $task $remote
lunchpail enqueue file $task
rm -f "$task"
done

echo "Exiting"
sleep infinity
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func transpile(sweep hlir.ParameterSweep) (hlir.Application, error) {
app.ApiVersion = sweep.ApiVersion
app.Kind = "Application"
app.Metadata.Name = sweep.Metadata.Name
app.Spec.Image = fmt.Sprintf("%s/%s/lunchpail-rclone:0.0.1", lunchpail.ImageRegistry, lunchpail.ImageRepo)
app.Spec.Image = fmt.Sprintf("%s/%s/lunchpail:%s", lunchpail.ImageRegistry, lunchpail.ImageRepo, lunchpail.Version())
app.Spec.Role = "dispatcher"
app.Spec.Command = "./main.sh"
app.Spec.Code = []hlir.Code{
Expand Down
8 changes: 0 additions & 8 deletions pkg/fe/transformer/api/dispatch/s3/embed.go

This file was deleted.

Loading