From f577f62727a16a282edfa7a49d985e47a4ccef79 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 8 Jun 2022 04:14:09 +0800 Subject: [PATCH] Add GetDeckPath to OutputReader (#268) * Add GetDeckPath to RemoteFileOutputReader Signed-off-by: Kevin Su * make generate Signed-off-by: Kevin Su * Fix tests Signed-off-by: Kevin Su * Fix tests Signed-off-by: Kevin Su * Fix tests Signed-off-by: Kevin Su * more tests Signed-off-by: Kevin Su * nit Signed-off-by: Kevin Su --- .../core/template/template_test.go | 4 ++ go/tasks/pluginmachinery/io/iface.go | 4 ++ .../io/mocks/output_file_paths.go | 32 +++++++++++++ .../pluginmachinery/io/mocks/output_reader.go | 36 ++++++++++++++ .../pluginmachinery/io/mocks/output_writer.go | 32 +++++++++++++ .../ioutils/in_memory_output_reader.go | 10 +++- .../ioutils/in_memory_output_reader_test.go | 44 +++++++++++++++++ go/tasks/pluginmachinery/ioutils/paths.go | 3 ++ .../ioutils/remote_file_output_reader.go | 5 ++ .../ioutils/remote_file_output_reader_test.go | 5 ++ .../ioutils/remote_file_output_writer.go | 4 ++ .../ioutils/remote_file_output_writer_test.go | 48 +++++++++++++++++++ go/tasks/plugins/array/awsbatch/monitor.go | 2 +- go/tasks/plugins/array/k8s/management.go | 2 +- go/tasks/plugins/array/outputs.go | 4 +- go/tasks/plugins/hive/execution_state.go | 2 +- .../plugins/k8s/sagemaker/builtin_training.go | 2 +- .../k8s/sagemaker/hyperparameter_tuning.go | 2 +- go/tasks/plugins/presto/execution_state.go | 2 +- go/tasks/plugins/webapi/athena/utils.go | 2 +- go/tasks/plugins/webapi/athena/utils_test.go | 2 +- go/tasks/plugins/webapi/bigquery/plugin.go | 2 +- 22 files changed, 237 insertions(+), 12 deletions(-) create mode 100644 go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go create mode 100644 go/tasks/pluginmachinery/ioutils/remote_file_output_writer_test.go diff --git a/go/tasks/pluginmachinery/core/template/template_test.go b/go/tasks/pluginmachinery/core/template/template_test.go index ffc5fd02d9..3f1f8460ae 100644 --- a/go/tasks/pluginmachinery/core/template/template_test.go +++ b/go/tasks/pluginmachinery/core/template/template_test.go @@ -45,6 +45,10 @@ type dummyOutputPaths struct { checkpointPath storage.DataReference } +func (d dummyOutputPaths) GetDeckPath() storage.DataReference { + panic("should not be called") +} + func (d dummyOutputPaths) GetPreviousCheckpointsPrefix() storage.DataReference { return d.prevCheckpointPath } diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index b39e4dd9e6..e920c44edf 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -41,6 +41,8 @@ type OutputReader interface { Exists(ctx context.Context) (bool, error) // Read returns the output -> *core.LiteralMap (nil if void), *ExecutionError if user error when reading the output and error to indicate system problems Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) + // GetDeckPath returns a fully qualified path (URN) of deck file. + GetDeckPath() *storage.DataReference } // CheckpointPaths provides the paths / keys to input Checkpoints directory and an output checkpoints directory. @@ -77,6 +79,8 @@ type OutputFilePaths interface { GetOutputPrefixPath() storage.DataReference // GetOutputPath returns a fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend GetOutputPath() storage.DataReference + // GetDeckPath returns a fully qualified path (URN) to where the framework expects the deck.html to exist in the configured storage backend + GetDeckPath() storage.DataReference // GetErrorPath returns a fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly // used by the framework, but could be used in the future GetErrorPath() storage.DataReference diff --git a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go index 5ff4e24f8d..6896a73b80 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go +++ b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go @@ -44,6 +44,38 @@ func (_m *OutputFilePaths) GetCheckpointPrefix() storage.DataReference { return r0 } +type OutputFilePaths_GetDeckPath struct { + *mock.Call +} + +func (_m OutputFilePaths_GetDeckPath) Return(_a0 storage.DataReference) *OutputFilePaths_GetDeckPath { + return &OutputFilePaths_GetDeckPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputFilePaths) OnGetDeckPath() *OutputFilePaths_GetDeckPath { + c := _m.On("GetDeckPath") + return &OutputFilePaths_GetDeckPath{Call: c} +} + +func (_m *OutputFilePaths) OnGetDeckPathMatch(matchers ...interface{}) *OutputFilePaths_GetDeckPath { + c := _m.On("GetDeckPath", matchers...) + return &OutputFilePaths_GetDeckPath{Call: c} +} + +// GetDeckPath provides a mock function with given fields: +func (_m *OutputFilePaths) GetDeckPath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type OutputFilePaths_GetErrorPath struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/io/mocks/output_reader.go b/go/tasks/pluginmachinery/io/mocks/output_reader.go index f48b51acab..33ced47428 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_reader.go +++ b/go/tasks/pluginmachinery/io/mocks/output_reader.go @@ -9,6 +9,8 @@ import ( io "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" mock "github.com/stretchr/testify/mock" + + storage "github.com/flyteorg/flytestdlib/storage" ) // OutputReader is an autogenerated mock type for the OutputReader type @@ -55,6 +57,40 @@ func (_m *OutputReader) Exists(ctx context.Context) (bool, error) { return r0, r1 } +type OutputReader_GetDeckPath struct { + *mock.Call +} + +func (_m OutputReader_GetDeckPath) Return(_a0 *storage.DataReference) *OutputReader_GetDeckPath { + return &OutputReader_GetDeckPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputReader) OnGetDeckPath() *OutputReader_GetDeckPath { + c := _m.On("GetDeckPath") + return &OutputReader_GetDeckPath{Call: c} +} + +func (_m *OutputReader) OnGetDeckPathMatch(matchers ...interface{}) *OutputReader_GetDeckPath { + c := _m.On("GetDeckPath", matchers...) + return &OutputReader_GetDeckPath{Call: c} +} + +// GetDeckPath provides a mock function with given fields: +func (_m *OutputReader) GetDeckPath() *storage.DataReference { + ret := _m.Called() + + var r0 *storage.DataReference + if rf, ok := ret.Get(0).(func() *storage.DataReference); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage.DataReference) + } + } + + return r0 +} + type OutputReader_IsError struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/io/mocks/output_writer.go b/go/tasks/pluginmachinery/io/mocks/output_writer.go index 4fb079d687..c261e5844e 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_writer.go +++ b/go/tasks/pluginmachinery/io/mocks/output_writer.go @@ -48,6 +48,38 @@ func (_m *OutputWriter) GetCheckpointPrefix() storage.DataReference { return r0 } +type OutputWriter_GetDeckPath struct { + *mock.Call +} + +func (_m OutputWriter_GetDeckPath) Return(_a0 storage.DataReference) *OutputWriter_GetDeckPath { + return &OutputWriter_GetDeckPath{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputWriter) OnGetDeckPath() *OutputWriter_GetDeckPath { + c := _m.On("GetDeckPath") + return &OutputWriter_GetDeckPath{Call: c} +} + +func (_m *OutputWriter) OnGetDeckPathMatch(matchers ...interface{}) *OutputWriter_GetDeckPath { + c := _m.On("GetDeckPath", matchers...) + return &OutputWriter_GetDeckPath{Call: c} +} + +// GetDeckPath provides a mock function with given fields: +func (_m *OutputWriter) GetDeckPath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type OutputWriter_GetErrorPath struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go index bdcb4bc764..1db6b0a800 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go @@ -4,12 +4,15 @@ import ( "context" "fmt" + "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ) type InMemoryOutputReader struct { literals *core.LiteralMap + DeckPath *storage.DataReference err *io.ExecutionError } @@ -40,9 +43,14 @@ func (r InMemoryOutputReader) Read(ctx context.Context) (*core.LiteralMap, *io.E return r.literals, r.err, nil } -func NewInMemoryOutputReader(literals *core.LiteralMap, err *io.ExecutionError) InMemoryOutputReader { +func (r InMemoryOutputReader) GetDeckPath() *storage.DataReference { + return r.DeckPath +} + +func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader { return InMemoryOutputReader{ literals: literals, + DeckPath: DeckPath, err: err, } } diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go new file mode 100644 index 0000000000..9b2be03a5a --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go @@ -0,0 +1,44 @@ +package ioutils + +import ( + "context" + "testing" + + flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/storage" + "github.com/stretchr/testify/assert" +) + +func TestInMemoryOutputReader(t *testing.T) { + deckPath := storage.DataReference("s3://bucket/key") + lt := map[string]*flyteIdlCore.Literal{ + "results": { + Value: &flyteIdlCore.Literal_Scalar{ + Scalar: &flyteIdlCore.Scalar{ + Value: &flyteIdlCore.Scalar_Primitive{ + Primitive: &flyteIdlCore.Primitive{Value: &flyteIdlCore.Primitive_Integer{Integer: 3}}, + }, + }, + }, + }, + } + or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, nil) + + assert.Equal(t, &deckPath, or.GetDeckPath()) + ctx := context.TODO() + + ok, err := or.IsError(ctx) + assert.False(t, ok) + assert.NoError(t, err) + + assert.False(t, or.IsFile(ctx)) + + ok, err = or.Exists(ctx) + assert.True(t, ok) + assert.NoError(t, err) + + literalMap, executionErr, err := or.Read(ctx) + assert.Equal(t, lt, literalMap.Literals) + assert.Nil(t, executionErr) + assert.NoError(t, err) +} diff --git a/go/tasks/pluginmachinery/ioutils/paths.go b/go/tasks/pluginmachinery/ioutils/paths.go index 3c997c2d7c..e50aa5484f 100644 --- a/go/tasks/pluginmachinery/ioutils/paths.go +++ b/go/tasks/pluginmachinery/ioutils/paths.go @@ -20,6 +20,9 @@ const ( // OutputsSuffix specifies that outputs are assumed to be written to this "file"/"suffix" under the given prefix // The outputs file has a format of core.LiteralMap OutputsSuffix = "outputs.pb" + // deckSuffix specifies that deck file are assumed to be written to this "file"/"suffix" under the given prefix + // The deck file has a format of HTML + deckSuffix = "deck.html" // ErrorsSuffix specifies that the errors are written to this prefix/file under the given prefix. The Error File // has a format of core.ErrorDocument ErrorsSuffix = "error.pb" diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 20be4ca05b..781608260c 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -114,6 +114,11 @@ func (r RemoteFileOutputReader) IsFile(ctx context.Context) bool { return true } +func (r RemoteFileOutputReader) GetDeckPath() *storage.DataReference { + path := r.outPath.GetDeckPath() + return &path +} + func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { return RemoteFileOutputReader{ outPath: outPaths, diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go index d1db7b89ac..4bfa74a7e0 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginsIOMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" storageMocks "github.com/flyteorg/flytestdlib/storage/mocks" @@ -16,6 +18,8 @@ func TestReadOrigin(t *testing.T) { opath := &pluginsIOMock.OutputFilePaths{} opath.OnGetErrorPath().Return("") + deckPath := "deck.html" + opath.OnGetDeckPath().Return(storage.DataReference(deckPath)) t.Run("user", func(t *testing.T) { errorDoc := &core.ErrorDocument{ @@ -44,6 +48,7 @@ func TestReadOrigin(t *testing.T) { assert.NoError(t, err) assert.Equal(t, core.ExecutionError_USER, ee.Kind) assert.False(t, ee.IsRecoverable) + assert.Equal(t, deckPath, r.GetDeckPath().String()) }) t.Run("system", func(t *testing.T) { diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go index f41867afda..d60d0c2b65 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go @@ -35,6 +35,10 @@ func (w RemoteFileOutputPaths) GetOutputPath() storage.DataReference { return constructPath(w.store, w.outputPrefix, OutputsSuffix) } +func (w RemoteFileOutputPaths) GetDeckPath() storage.DataReference { + return constructPath(w.store, w.outputPrefix, deckSuffix) +} + func (w RemoteFileOutputPaths) GetErrorPath() storage.DataReference { return constructPath(w.store, w.outputPrefix, ErrorsSuffix) } diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_writer_test.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer_test.go new file mode 100644 index 0000000000..ecca892fcc --- /dev/null +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_writer_test.go @@ -0,0 +1,48 @@ +package ioutils + +import ( + "context" + "testing" + + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/storage" + "github.com/stretchr/testify/assert" +) + +func TestRemoteFileOutputWriter(t *testing.T) { + ctx := context.TODO() + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.Nil(t, err) + + outputPrefix := storage.DataReference("output") + rawOutputPrefix := storage.DataReference("sandbox") + previousCheckpointPath := storage.DataReference("checkpoint") + + checkpointPath := NewCheckpointRemoteFilePaths( + ctx, + memStore, + outputPrefix, + NewRawOutputPaths(ctx, rawOutputPrefix), + previousCheckpointPath, + ) + + t.Run("Test NewCheckpointRemoteFilePaths", func(t *testing.T) { + assert.Equal(t, previousCheckpointPath, checkpointPath.GetPreviousCheckpointsPrefix()) + assert.Equal(t, outputPrefix, checkpointPath.GetOutputPrefixPath()) + + assert.Equal(t, constructPath(memStore, rawOutputPrefix, CheckpointPrefix), checkpointPath.GetCheckpointPrefix()) + assert.Equal(t, constructPath(memStore, outputPrefix, OutputsSuffix), checkpointPath.GetOutputPath()) + assert.Equal(t, constructPath(memStore, outputPrefix, deckSuffix), checkpointPath.GetDeckPath()) + assert.Equal(t, constructPath(memStore, outputPrefix, ErrorsSuffix), checkpointPath.GetErrorPath()) + assert.Equal(t, constructPath(memStore, outputPrefix, FuturesSuffix), checkpointPath.GetFuturesPath()) + }) + + t.Run("Test NewRemoteFileOutputWriter", func(t *testing.T) { + p := NewRemoteFileOutputWriter(ctx, memStore, checkpointPath) + + assert.Equal(t, constructPath(memStore, rawOutputPrefix, CheckpointPrefix), p.GetCheckpointPrefix()) + assert.Equal(t, constructPath(memStore, outputPrefix, OutputsSuffix), p.GetOutputPath()) + assert.Equal(t, constructPath(memStore, outputPrefix, deckSuffix), p.GetDeckPath()) + assert.Equal(t, constructPath(memStore, outputPrefix, ErrorsSuffix), p.GetErrorPath()) + }) +} diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index 232068fe15..846aec4204 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -94,7 +94,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata return nil, err } - if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{ + if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{ ExecutionError: &core2.ExecutionError{ Code: "", Message: subJob.Status.Message, diff --git a/go/tasks/plugins/array/k8s/management.go b/go/tasks/plugins/array/k8s/management.go index 89c5c4a6c8..0253791aed 100644 --- a/go/tasks/plugins/array/k8s/management.go +++ b/go/tasks/plugins/array/k8s/management.go @@ -230,7 +230,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon return currentState, externalResources, err } - if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{ + if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{ ExecutionError: phaseInfo.Err(), IsRecoverable: phaseInfo.Phase() != core.PhasePermanentFailure, })); err != nil { diff --git a/go/tasks/plugins/array/outputs.go b/go/tasks/plugins/array/outputs.go index 531f204877..ed9b08f39f 100644 --- a/go/tasks/plugins/array/outputs.go +++ b/go/tasks/plugins/array/outputs.go @@ -107,7 +107,7 @@ func (w assembleOutputsWorker) Process(ctx context.Context, workItem workqueue.W } ow := ioutils.NewRemoteFileOutputWriter(ctx, i.dataStore, i.outputPaths) - if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil)); err != nil { + if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil, nil)); err != nil { return workqueue.WorkStatusNotDone, err } @@ -313,7 +313,7 @@ func (a assembleErrorsWorker) Process(ctx context.Context, workItem workqueue.Wo } ow := ioutils.NewRemoteFileOutputWriter(ctx, w.dataStore, w.outputPaths) - if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{ + if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{ ExecutionError: &core.ExecutionError{ Code: "", Message: msg, diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index ed6f17cfbb..d0f86f73d1 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -530,7 +530,7 @@ func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentSt }, }, }, - }, nil)) + }, nil, nil)) if err != nil { logger.Errorf(ctx, "Error writing outputs file: [%s]", err) return currentState, err diff --git a/go/tasks/plugins/k8s/sagemaker/builtin_training.go b/go/tasks/plugins/k8s/sagemaker/builtin_training.go index e27e7994bf..0116b4ebfc 100644 --- a/go/tasks/plugins/k8s/sagemaker/builtin_training.go +++ b/go/tasks/plugins/k8s/sagemaker/builtin_training.go @@ -230,7 +230,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForTrainingJob( return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task") } // Instantiate a output reader with the literal map, and write the output to the remote location referred to by the OutputWriter - if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil)); err != nil { + if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil, nil)); err != nil { return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "Unable to write output to the remote location") } logger.Debugf(ctx, "Successfully produced and returned outputs") diff --git a/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go b/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go index b1fba80988..dc6e02eed5 100644 --- a/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go +++ b/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go @@ -257,7 +257,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForHyperparameterTuningJob( logger.Errorf(ctx, "Failed to create outputs, err: %s", err) return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task") } - if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil)); err != nil { + if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil, nil)); err != nil { return pluginsCore.PhaseInfoUndefined, err } logger.Debugf(ctx, "Successfully produced and returned outputs") diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 7d241cb09a..1803d3ded6 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -462,7 +462,7 @@ func writeOutput(ctx context.Context, tCtx core.TaskExecutionContext, externalLo }, }, }, - }, nil)) + }, nil, nil)) } // The 'PhaseInfoRunning' occurs 15 times (3 for each of the 5 Presto queries that get run for every Presto task) which diff --git a/go/tasks/plugins/webapi/athena/utils.go b/go/tasks/plugins/webapi/athena/utils.go index a4b89b4e66..b7f9fd6962 100644 --- a/go/tasks/plugins/webapi/athena/utils.go +++ b/go/tasks/plugins/webapi/athena/utils.go @@ -48,7 +48,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, externalLocatio }, }, }, - }, nil)) + }, nil, nil)) } type QueryInfo struct { diff --git a/go/tasks/plugins/webapi/athena/utils_test.go b/go/tasks/plugins/webapi/athena/utils_test.go index e298b90a25..3686441853 100644 --- a/go/tasks/plugins/webapi/athena/utils_test.go +++ b/go/tasks/plugins/webapi/athena/utils_test.go @@ -111,7 +111,7 @@ func Test_writeOutput(t *testing.T) { }, }, }, - }, nil)).Return(nil) + }, nil, nil)).Return(nil) statusContext.OnOutputWriter().Return(ow) err = writeOutput(context.Background(), statusContext, externalLocation) diff --git a/go/tasks/plugins/webapi/bigquery/plugin.go b/go/tasks/plugins/webapi/bigquery/plugin.go index 8dd45b650b..202937c233 100644 --- a/go/tasks/plugins/webapi/bigquery/plugin.go +++ b/go/tasks/plugins/webapi/bigquery/plugin.go @@ -325,7 +325,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, OutputLocation }, }, }, - }, nil)) + }, nil, nil)) } func handleCreateError(createError *googleapi.Error, taskInfo *core.TaskInfo) core.PhaseInfo {