Skip to content

Commit

Permalink
feat: lunchpail enqueue file --wait
Browse files Browse the repository at this point in the history
Fixes #138

covered by test7-wait

Signed-off-by: Nick Mitchell <nickm@us.ibm.com>
  • Loading branch information
starpit committed Aug 14, 2024
1 parent 11bca29 commit ad6557d
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cmd/subcommands/enqueue/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ func NewEnqueueFileCmd() *cobra.Command {
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
}

var wait bool
var verbose bool
cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Wait for the task to be completed")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if compilation.IsCompiled() {
// TODO: pull out command line and other
// embeddings from this compiled executable
return fmt.Errorf("TODO")
}

return queue.EnqueueFile(args[0])
return queue.EnqueueFile(args[0], queue.EnqueueFileOptions{Wait: wait, Verbose: verbose})
}

return cmd
Expand Down
10 changes: 9 additions & 1 deletion pkg/fe/transformer/api/dispatch/parametersweep/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ do
echo "Injecting task=$task parameter_value=${parameter_value}"
echo -n ${parameter_value} > $task

lunchpail enqueue file $task
if [ -n "$__LUNCHPAIL_WAIT" ]
then waitflag="--wait"
fi

if [ -n "$__LUNCHPAIL_VERBOSE" ]
then verboseflag="--verbose"
fi

lunchpail enqueue file $task $waitflag $verboseflag
rm -f "$task"
done
8 changes: 8 additions & 0 deletions pkg/fe/transformer/api/dispatch/parametersweep/transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,13 @@ func transpile(sweep hlir.ParameterSweep) (hlir.Application, error) {
app.Spec.Env["__LUNCHPAIL_INTERVAL"] = strconv.Itoa(sweep.Spec.Interval)
}

if sweep.Spec.Wait {
app.Spec.Env["__LUNCHPAIL_WAIT"] = "true"
}

if sweep.Spec.Verbose {
app.Spec.Env["__LUNCHPAIL_VERBOSE"] = "true"
}

return app, nil
}
6 changes: 6 additions & 0 deletions pkg/ir/hlir/parametersweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,11 @@ type ParameterSweep struct {
Step int `yaml:",omitempty"`
Interval int `yaml:",omitempty"`
Env Env `yaml:",omitempty"`

// Wait for each task to complete before proceeding to the next task
Wait bool

// Verbose output
Verbose bool
}
}
20 changes: 18 additions & 2 deletions pkg/runtime/queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ import (
"golang.org/x/sync/errgroup"
)

func EnqueueFile(task string) error {
type EnqueueFileOptions struct {
// Wait for the enqueued task to be completed
Wait bool

// Verbose output
Verbose bool
}

func EnqueueFile(task string, opts EnqueueFileOptions) error {
c, err := NewS3Client()
if err != nil {
return err
Expand All @@ -21,7 +29,15 @@ func EnqueueFile(task string) error {
return err
}

return c.Upload(c.Paths.Bucket, task, filepath.Join(c.Paths.PoolPrefix, c.Paths.Inbox, filepath.Base(task)))
if err := c.Upload(c.Paths.Bucket, task, filepath.Join(c.Paths.PoolPrefix, c.Paths.Inbox, filepath.Base(task))); err != nil {
return err
}

if opts.Wait {
return c.WaitForCompletion(filepath.Base(task), opts.Verbose)
}

return nil
}

func EnqueueFromS3(fullpath, endpoint, accessKeyId, secretAccessKey string, repeat int) error {
Expand Down
28 changes: 14 additions & 14 deletions pkg/runtime/queue/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewS3ClientFromOptions(opts S3ClientOptions) (S3Client, error) {
return S3Client{client, opts.Endpoint, pathsForRun()}, err
}

func (s3 *S3Client) Lsf(bucket, prefix string) ([]string, error) {
func (s3 S3Client) Lsf(bucket, prefix string) ([]string, error) {
objectCh := s3.client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
Prefix: prefix + "/",
Recursive: false,
Expand All @@ -76,15 +76,15 @@ func (s3 *S3Client) Lsf(bucket, prefix string) ([]string, error) {
return tasks, nil
}

func (s3 *S3Client) Exists(bucket, prefix, file string) bool {
func (s3 S3Client) Exists(bucket, prefix, file string) bool {
if _, err := s3.client.StatObject(context.Background(), bucket, filepath.Join(prefix, file), minio.StatObjectOptions{}); err == nil {
return true
} else {
return false
}
}

func (s3 *S3Client) Copyto(sourceBucket, source, destBucket, dest string) error {
func (s3 S3Client) Copyto(sourceBucket, source, destBucket, dest string) error {
src := minio.CopySrcOptions{
Bucket: sourceBucket,
Object: source,
Expand All @@ -99,7 +99,7 @@ func (s3 *S3Client) Copyto(sourceBucket, source, destBucket, dest string) error
return err
}

func (origin *S3Client) CopyToRemote(remote S3Client, sourceBucket, source, destBucket, dest string) error {
func (origin S3Client) CopyToRemote(remote S3Client, sourceBucket, source, destBucket, dest string) error {
if origin.endpoint == remote.endpoint {
// special case...
return origin.Copyto(sourceBucket, source, destBucket, dest)
Expand All @@ -120,46 +120,46 @@ func (origin *S3Client) CopyToRemote(remote S3Client, sourceBucket, source, dest
return nil
}

func (s3 *S3Client) Moveto(bucket, source, destination string) error {
func (s3 S3Client) Moveto(bucket, source, destination string) error {
if err := s3.Copyto(bucket, source, bucket, destination); err != nil {
return err
}

return s3.Rm(bucket, source)
}

func (s3 *S3Client) Upload(bucket, source, destination string) error {
func (s3 S3Client) Upload(bucket, source, destination string) error {
_, err := s3.client.FPutObject(context.Background(), bucket, destination, source, minio.PutObjectOptions{})
return err
}

func (s3 *S3Client) Download(bucket, source, destination string) error {
func (s3 S3Client) Download(bucket, source, destination string) error {
return s3.client.FGetObject(context.Background(), bucket, source, destination, minio.GetObjectOptions{})
}

func (s3 *S3Client) Touch(bucket, filePath string) error {
func (s3 S3Client) Touch(bucket, filePath string) error {
r := strings.NewReader("")
_, err := s3.client.PutObject(context.Background(), bucket, filePath, r, 0, minio.PutObjectOptions{})
return err
}

func (s3 *S3Client) Rm(bucket, filePath string) error {
func (s3 S3Client) Rm(bucket, filePath string) error {
return s3.client.RemoveObject(context.Background(), bucket, filePath, minio.RemoveObjectOptions{})
}

func (s3 *S3Client) Mark(bucket, filePath, marker string) error {
func (s3 S3Client) Mark(bucket, filePath, marker string) error {
_, err := s3.client.PutObject(context.Background(), bucket, filePath, strings.NewReader(marker), int64(len(marker)), minio.PutObjectOptions{})
return err
}

func (s3 *S3Client) ListObjects(bucket, filePath string, recursive bool) <-chan minio.ObjectInfo {
func (s3 S3Client) ListObjects(bucket, filePath string, recursive bool) <-chan minio.ObjectInfo {
return s3.client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
Prefix: filePath,
Recursive: recursive,
})
}

func (s3 *S3Client) Cat(bucket, filePath string) error {
func (s3 S3Client) Cat(bucket, filePath string) error {
s, err := s3.client.GetObject(context.Background(), bucket, filePath, minio.GetObjectOptions{})
if err != nil {
return err
Expand All @@ -168,7 +168,7 @@ func (s3 *S3Client) Cat(bucket, filePath string) error {
return nil
}

func (s3 *S3Client) BucketExists(bucket string) (bool, error) {
func (s3 S3Client) BucketExists(bucket string) (bool, error) {
yup := false
for {
if exists, err := s3.client.BucketExists(context.Background(), bucket); err != nil {
Expand All @@ -189,7 +189,7 @@ func (s3 *S3Client) BucketExists(bucket string) (bool, error) {
return yup, nil
}

func (s3 *S3Client) Mkdirp(bucket string) error {
func (s3 S3Client) Mkdirp(bucket string) error {
exists, err := s3.BucketExists(bucket)
if err != nil {
return err
Expand Down
33 changes: 33 additions & 0 deletions pkg/runtime/queue/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package queue

import (
"fmt"
"os"
"path/filepath"
"slices"
"time"
)

func (c S3Client) WaitForCompletion(task string, verbose bool) error {
for {
doneTasks, err := c.Lsf(c.Paths.Bucket, filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox))
if err != nil {
return err
}

if idx := slices.IndexFunc(doneTasks, func(otask string) bool { return otask == task }); idx >= 0 {
break
} else {
if verbose {
fmt.Fprintf(os.Stderr, "Still waiting for task completion %s. Here is what is done so far: %v\n", task, doneTasks)
}
time.Sleep(3 * time.Second)
}
}

if verbose {
fmt.Fprintf(os.Stderr, "Task completed %s\n", task)
}

return nil
}
8 changes: 7 additions & 1 deletion tests/bin/helpers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ function waitForIt {
for done in "${dones[@]}"; do
idx=0
while true; do
kubectl -n $ns logs $containers -l $workerselector --tail=-1 | grep -E "$done" && break || echo "$(tput setaf 5)🧪 Still waiting for output $done test=$name...$(tput sgr0)"
if [[ -n $DEBUG ]] || (( $idx > 10 ))
then set -x
fi
kubectl -n $ns logs $containers -l 'app.kubernetes.io/component in (workerpool,workdispatcher)' --tail=-1 | grep -E "$done" && break || echo "$(tput setaf 5)🧪 Still waiting for output $done test=$name...$(tput sgr0)"
if [[ -n $DEBUG ]] || (( $idx > 10 ))
then set +x
fi

if [[ -n $DEBUG ]] || (( $idx > 10 )); then
# if we can't find $done in the logs after a few
Expand Down
3 changes: 3 additions & 0 deletions tests/tests/test7-wait/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# test7-wait

Same as test7, except this test uses the ParameterSweep in "wait" mode.
37 changes: 37 additions & 0 deletions tests/tests/test7-wait/add-data-to-queue.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env bash

SCRIPTDIR=$(cd $(dirname "$0") && pwd)

export NAMESPACE=$1

# number of task
N=${2-10}

# name of s3 bucket in which to store the tasks
BUCKET=${3-test7}
RUN_NAME=$BUCKET

B=$(mktemp -d)/$BUCKET # bucket path
D=$B/$BUCKET # data path; in this case the bucket name and the folder name are both the run name
mkdir -p $D
echo "Staging to $D" 1>&2

for idx in $(seq 1 $N) # for each iteration
do
# if we are doing a test, then make sure to use a
# repeatable name for the task files, so that we know what
# to look for when confirming that the tasks were
# processed by the workers
if [[ -n "$CI" ]] || [[ -n "$RUNNING_CODEFLARE_TESTS" ]]; then
id=$idx
else
# otherwise, use a more random name, so that we can
# inject multiple batches of tasks across executions
# of this script
id=$(uuidgen)
fi

echo "this is task idx=$idx" > $D/task.$id.txt
done

"$SCRIPTDIR"/../../../tests/bin/add-data.sh $B
13 changes: 13 additions & 0 deletions tests/tests/test7-wait/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

SCRIPTDIR=$(cd $(dirname "$0") && pwd)

# make sure these values are compatible with the values in ./settings.sh
NUM_TASKS=6

# $1: namespace

"$SCRIPTDIR"/add-data-to-queue.sh \
$1 \
$NUM_TASKS \
${TEST_NAME-test7}
41 changes: 41 additions & 0 deletions tests/tests/test7-wait/pail/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: lunchpail.io/v1alpha1
kind: Application
metadata:
name: test7-wait
spec:
role: worker
code:
- name: main.sh
source: |
#!/usr/bin/env sh
# $1 input filepath
# $2 output filepath
in="$1"
out="$2"
dataset_name=test # match with below!
bucket_name=test7-wait
folder_name=test7-wait
N=$(ls $dataset_name/$bucket_name/$folder_name | wc -l | xargs)
echo "Processing $N $in"
sleep 5
echo "Done with $in"
command: ./main.sh
minSize: auto
securityContext:
runAsUser: 2000 # lunchpail, same as is specified Dockerfile
runAsGroup: 0 # root, ibid
containerSecurityContext:
runAsUser: 2000 # lunchpail, same as is specified Dockerfile
runAsGroup: 0 # root, ibid

datasets:
- name: test
s3:
secret: test7data
copyIn:
path: "test7-wait/"
9 changes: 9 additions & 0 deletions tests/tests/test7-wait/pail/dataset-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
kind: Secret
metadata:
name: test7data
type: Opaque
stringData:
endpoint: {{ .Values.global.s3Endpoint }}
accessKeyID: {{ .Values.global.s3AccessKey }}
secretAccessKey: {{ .Values.global.s3SecretKey }}
11 changes: 11 additions & 0 deletions tests/tests/test7-wait/pail/dispatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: lunchpail.io/v1alpha1
kind: ParameterSweep
metadata:
name: test7-wait-workdispatcher
spec:
min: 1
max: {{ .Values.nTasks | default 5 }}
step: 1
interval: {{ .Values.every | default 5 }}
wait: true
verbose: true
8 changes: 8 additions & 0 deletions tests/tests/test7-wait/pail/pool1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: lunchpail.io/v1alpha1
kind: WorkerPool
metadata:
name: test7-wait-pool1
spec:
workers:
count: 2
size: auto
1 change: 1 addition & 0 deletions tests/tests/test7-wait/pail/version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.1.4.1
7 changes: 7 additions & 0 deletions tests/tests/test7-wait/settings.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
api=workqueue
taskqueue=test7-wait

# /queue/0,1 <-- 2 workers
# task.1,task.3,task.5 <-- 3 tasks per iter

expected=("Processing 6 /queue/processing/task.1.txt" "Task completed task.1.txt" "Task completed task.3.txt")

0 comments on commit ad6557d

Please sign in to comment.