From 5b8f3f84c40c28c905d44a8a19d09251a6049664 Mon Sep 17 00:00:00 2001 From: General Kroll Date: Wed, 9 Jul 2025 17:12:35 +1000 Subject: [PATCH] insert-returning-asynchronous Summary: - Support for asynchronous `INSERT RETURNINNG` semantics. - Update docs. - Flask app uplift. - Added robot test `Insert Async Returning Simple Projection`. - Async progress messages now routed to `stderr`, prior case was `stdout`. --- .github/workflows/build.yml | 2 + .vscode/launch.json | 1 + docs/developer_guide.md | 2 +- go.mod | 2 +- go.sum | 4 +- .../async_compose.go | 13 ++- .../asyncmonitor.go | 21 +++- .../dependencyplanner/dependencyplanner.go | 1 + .../dependent_simple_integration_test.go | 4 +- .../stackql/driver/driver_integration_test.go | 4 +- .../execution/mono_valent_execution.go | 96 ++++++++++++++++++- .../builder_input/builder_input.go | 28 ++++++ internal/stackql/planbuilder/plan_builder.go | 49 ++++++++-- internal/stackql/primitivebuilder/delete.go | 4 +- internal/stackql/primitivebuilder/exec.go | 5 +- .../primitivebuilder/generic_http_reversal.go | 15 ++- .../generic_http_stream_input.go | 15 ++- .../primitivebuilder/insert_or_update.go | 7 +- .../primitivebuilder/mono_valent_builder.go | 4 +- .../single_acquire_and_select.go | 4 +- .../primitivebuilder/single_select_acquire.go | 2 + .../primitivegenerator/statement_analyzer.go | 3 +- internal/test/stackqltestutil/helper.go | 22 +++++ stackql/main_integration_test.go | 8 +- .../stackql_test_tooling/flask/gcp/app.py | 67 ++++++++++++- .../gcp/templates/global-operation.jinja.json | 15 +++ .../networks-insert-generic-mature.jinja.json | 14 +++ .../v0.1.2/resources/compute-v1.yaml | 3 + .../stackql_mocked_from_cmd_line.robot | 27 ++++++ 29 files changed, 399 insertions(+), 43 deletions(-) rename internal/stackql/{primitivebuilder => asynccompose}/async_compose.go (71%) rename internal/stackql/{primitivebuilder => asynccompose}/asyncmonitor.go (91%) create mode 100644 test/python/stackql_test_tooling/flask/gcp/templates/global-operation.jinja.json create mode 100644 test/python/stackql_test_tooling/flask/gcp/templates/networks-insert-generic-mature.jinja.json diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b8922540..03a57bf0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1461,6 +1461,8 @@ jobs: dockertest: + env: + IS_DOCKER: 'true' name: Docker Test needs: - dockerbuild diff --git a/.vscode/launch.json b/.vscode/launch.json index a2b92513..e81b7264 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -176,6 +176,7 @@ "select rings.projectsId as project, rings.locationsId as locale, split_part(rings.name, '/', -1) as key_ring_name, split_part(keys.name, '/', -1) as key_name, json_extract(keys.\"versionTemplate\", '$.algorithm') as key_algorithm, json_extract(keys.\"versionTemplate\", '$.protectionLevel') as key_protection_level from google.cloudkms.key_rings rings inner join google.cloudkms.crypto_keys keys on keys.keyRingsId = split_part(rings.name, '/', -1) and keys.projectsId = rings.projectsId and keys.locationsId = rings.locationsId where rings.projectsId in ('testing-project', 'testing-project-two', 'testing-project-three') and rings.locationsId in ('global', 'australia-southeast1', 'australia-southeast2') order by project, locale, key_name ;", "delete from aws.cloud_control.resources where region = 'ap-southeast-1' and data__TypeName = 'AWS::Logs::LogGroup' and data__Identifier = 'LogGroupResourceExampleThird' ;", "insert into google.storage.buckets( project, data__name) select 'testing-project', 'silly-bucket' returning projectNumber;", + "insert /*+ AWAIT */ into google.compute.networks(project, data__name, data__autoCreateSubnetworks) select 'mutable-project', 'auto-test-01', false returning creationTimestamp, name;", ], "default": "show providers;" }, diff --git a/docs/developer_guide.md b/docs/developer_guide.md index a792d847..90f3b5ed 100644 --- a/docs/developer_guide.md +++ b/docs/developer_guide.md @@ -274,7 +274,7 @@ https://docs.aws.amazon.com/sdk-for-go/api/aws/signer/v4/ `INSERT RETURNING` can function in two mechanisms: - Synchronous responses, such as [`google.storage.buckets`](https://cloud.google.com/storage/docs/json_api/v1/buckets/insert). The returning clause is a projection on the immediately available reponse body. -- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded. +- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert) and [`google.compute.networks`](https://cloud.google.com/compute/docs/reference/rest/v1/networks/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded. Future use cases for `UPDATE RETURNING`, `REPLACE RETURNING` and `DELETE RETURNING` will function the same observable fashion. diff --git a/go.mod b/go.mod index 7adfcc70..45192424 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.10.1 - github.com/stackql/any-sdk v0.1.3-beta02 + github.com/stackql/any-sdk v0.1.4-alpha06 github.com/stackql/go-suffix-map v0.0.1-alpha01 github.com/stackql/psql-wire v0.1.1-beta23 github.com/stackql/stackql-parser v0.0.15-alpha06 diff --git a/go.sum b/go.sum index 01ecdef2..4902a5d8 100644 --- a/go.sum +++ b/go.sum @@ -484,8 +484,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk= github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU= -github.com/stackql/any-sdk v0.1.3-beta02 h1:0jSwyYFddjAN++U+yNNLF7SMLPIIaRhv522UzeWDf2E= -github.com/stackql/any-sdk v0.1.3-beta02/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8= +github.com/stackql/any-sdk v0.1.4-alpha06 h1:QJPf3ehPrRqmYZR+TmD897AsmsaOamHErLhaE5B9v/w= +github.com/stackql/any-sdk v0.1.4-alpha06/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8= github.com/stackql/go-suffix-map v0.0.1-alpha01 h1:TDUDS8bySu41Oo9p0eniUeCm43mnRM6zFEd6j6VUaz8= github.com/stackql/go-suffix-map v0.0.1-alpha01/go.mod h1:QAi+SKukOyf4dBtWy8UMy+hsXXV+yyEE4vmBkji2V7g= github.com/stackql/psql-wire v0.1.1-beta23 h1:1ayYMjZArfDcIMyEOKnm+Bp1zRCISw8pguvTFuUhhVQ= diff --git a/internal/stackql/primitivebuilder/async_compose.go b/internal/stackql/asynccompose/async_compose.go similarity index 71% rename from internal/stackql/primitivebuilder/async_compose.go rename to internal/stackql/asynccompose/async_compose.go index 3f0c0017..4cd5adbb 100644 --- a/internal/stackql/primitivebuilder/async_compose.go +++ b/internal/stackql/asynccompose/async_compose.go @@ -1,22 +1,26 @@ -package primitivebuilder +package asynccompose import ( "github.com/stackql/any-sdk/anysdk" "github.com/stackql/stackql-parser/go/vt/sqlparser" + "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/handler" "github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto" "github.com/stackql/stackql/internal/stackql/primitive" "github.com/stackql/stackql/internal/stackql/provider" ) -func composeAsyncMonitor( +func ComposeAsyncMonitor( handlerCtx handler.HandlerContext, precursor primitive.IPrimitive, prov provider.IProvider, method anysdk.OperationStore, commentDirectives sqlparser.CommentDirectives, + isReturning bool, + insertCtx drm.PreparedStatementCtx, + drmCfg drm.Config, ) (primitive.IPrimitive, error) { - asm, err := NewAsyncMonitor(handlerCtx, prov, method) + asm, err := NewAsyncMonitor(handlerCtx, prov, method, isReturning) if err != nil { return nil, err } @@ -31,7 +35,8 @@ func composeAsyncMonitor( handlerCtx.GetOutfile(), handlerCtx.GetOutErrFile(), ) - primitive, err := asm.GetMonitorPrimitive(prov, method, precursor, pl, commentDirectives) + primitive, err := asm.GetMonitorPrimitive( + prov, method, precursor, pl, commentDirectives, isReturning, insertCtx, drmCfg) if err != nil { return nil, err } diff --git a/internal/stackql/primitivebuilder/asyncmonitor.go b/internal/stackql/asynccompose/asyncmonitor.go similarity index 91% rename from internal/stackql/primitivebuilder/asyncmonitor.go rename to internal/stackql/asynccompose/asyncmonitor.go index 4752b53a..85ae73cc 100644 --- a/internal/stackql/primitivebuilder/asyncmonitor.go +++ b/internal/stackql/asynccompose/asyncmonitor.go @@ -1,4 +1,4 @@ -package primitivebuilder +package asynccompose import ( "fmt" @@ -6,6 +6,7 @@ import ( "github.com/stackql/any-sdk/anysdk" "github.com/stackql/stackql/internal/stackql/acid/binlog" + "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/execution" "github.com/stackql/stackql/internal/stackql/handler" "github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto" @@ -22,6 +23,9 @@ type IAsyncMonitor interface { precursor primitive.IPrimitive, initialCtx primitive.IPrimitiveCtx, comments sqlparser.CommentDirectives, + isReturning bool, + insertCtx drm.PreparedStatementCtx, + drmCfg drm.Config, ) (primitive.IPrimitive, error) } @@ -120,11 +124,12 @@ func NewAsyncMonitor( handlerCtx handler.HandlerContext, prov provider.IProvider, op anysdk.OperationStore, + isReturning bool, ) (IAsyncMonitor, error) { //nolint:gocritic //TODO: refactor switch prov.GetProviderString() { case "google": - return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion()) + return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion(), isReturning) } return nil, fmt.Errorf( "async operation monitor for provider = '%s', api version = '%s' currently not supported", @@ -136,6 +141,7 @@ func newGoogleAsyncMonitor( prov provider.IProvider, op anysdk.OperationStore, version string, //nolint:unparam // TODO: refactor + isReturning bool, //nolint:unparam,revive // TODO: refactor ) (IAsyncMonitor, error) { //nolint:gocritic //TODO: refactor switch version { @@ -160,11 +166,14 @@ func (gm *DefaultGoogleAsyncMonitor) GetMonitorPrimitive( precursor primitive.IPrimitive, initialCtx primitive.IPrimitiveCtx, comments sqlparser.CommentDirectives, + isReturning bool, + insertCtx drm.PreparedStatementCtx, + drmCfg drm.Config, ) (primitive.IPrimitive, error) { //nolint:gocritic,staticcheck //TODO: refactor switch strings.ToLower(prov.GetVersion()) { default: - return gm.getV1Monitor(prov, op, precursor, initialCtx, comments) + return gm.getV1Monitor(prov, op, precursor, initialCtx, comments, isReturning, insertCtx, drmCfg) } } @@ -174,6 +183,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor( precursor primitive.IPrimitive, initialCtx primitive.IPrimitiveCtx, comments sqlparser.CommentDirectives, + isReturning bool, + insertCtx drm.PreparedStatementCtx, + drmCfg drm.Config, ) (primitive.IPrimitive, error) { provider, providerErr := prov.GetProvider() if providerErr != nil { @@ -186,6 +198,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor( precursor, initialCtx, comments, + isReturning, + insertCtx, + drmCfg, ) if exPrepErr != nil { return nil, exPrepErr diff --git a/internal/stackql/dependencyplanner/dependencyplanner.go b/internal/stackql/dependencyplanner/dependencyplanner.go index 46e1b114..560d2713 100644 --- a/internal/stackql/dependencyplanner/dependencyplanner.go +++ b/internal/stackql/dependencyplanner/dependencyplanner.go @@ -485,6 +485,7 @@ func (dp *standardDependencyPlanner) orchestrate( insPsc, nil, outStream, + false, // returning hardcoded to false for now ) } dp.execSlice = append(dp.execSlice, builder) diff --git a/internal/stackql/driver/dependent_simple_integration_test.go b/internal/stackql/driver/dependent_simple_integration_test.go index 1b9c4261..3a44d081 100644 --- a/internal/stackql/driver/dependent_simple_integration_test.go +++ b/internal/stackql/driver/dependent_simple_integration_test.go @@ -38,7 +38,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsync(t *testing.T) { if err != nil { t.Fatalf("Test failed: %v", err) } - handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true) + handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true) if err != nil { t.Fatalf("Test failed: %v", err) } @@ -76,7 +76,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsyncReversed(t *testing.T) { if err != nil { t.Fatalf("Test failed: %v", err) } - handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true) + handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true) if err != nil { t.Fatalf("Test failed: %v", err) } diff --git a/internal/stackql/driver/driver_integration_test.go b/internal/stackql/driver/driver_integration_test.go index 636fdfb8..355e3ffd 100644 --- a/internal/stackql/driver/driver_integration_test.go +++ b/internal/stackql/driver/driver_integration_test.go @@ -180,7 +180,7 @@ func TestSimpleInsertGoogleComputeNetworkAsync(t *testing.T) { } testSubject := func(t *testing.T, outFile *bufio.Writer) { - handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true) + handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true) if err != nil { t.Fatalf("Test failed: %v", err) } @@ -226,7 +226,7 @@ func TestK8sTheHardWayAsync(t *testing.T) { runtimeCtx.InfilePath = k8sthwRenderedFile runtimeCtx.CSVHeadersDisable = true - handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true) + handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true) if err != nil { t.Fatalf("Test failed: %v", err) } diff --git a/internal/stackql/execution/mono_valent_execution.go b/internal/stackql/execution/mono_valent_execution.go index 620096e5..fb3a0798 100644 --- a/internal/stackql/execution/mono_valent_execution.go +++ b/internal/stackql/execution/mono_valent_execution.go @@ -1130,6 +1130,7 @@ func (sp *standardProcessor) Process() ProcessorResponse { if httpResponseErr != nil { return newHTTPProcessorResponse(nil, reversalStream, false, httpResponseErr) } + // TODO: add async monitor here processed, resErr := method.ProcessResponse(httpResponse) if resErr != nil { if isSkipResponse && isMutation && httpResponse.StatusCode < 300 { @@ -1444,6 +1445,32 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i return ex, nil } +func shimProcessHTTP( + url string, + rtCtx dto.RuntimeCtx, + authCtx *dto.AuthCtx, + provider anysdk.Provider, + m anysdk.OperationStore, + outErrFile io.Writer, +) (*http.Response, error) { + req, monitorReqErr := anysdk.GetMonitorRequest(url) + if monitorReqErr != nil { + return nil, monitorReqErr + } + cc := anysdk.NewAnySdkClientConfigurator(rtCtx, provider.GetName()) + anySdkResponse, apiErr := anysdk.CallFromSignature( + cc, rtCtx, authCtx, authCtx.Type, false, outErrFile, provider, anysdk.NewAnySdkOpStoreDesignation(m), req) + + if apiErr != nil { + return nil, apiErr + } + httpResponse, httpResponseErr := anySdkResponse.GetHttpResponse() + if httpResponseErr != nil { + return nil, httpResponseErr + } + return httpResponse, nil +} + //nolint:funlen,gocognit // acceptable for now func GetMonitorExecutor( handlerCtx handler.HandlerContext, @@ -1452,6 +1479,9 @@ func GetMonitorExecutor( precursor primitive.IPrimitive, initialCtx primitive.IPrimitiveCtx, comments sqlparser.CommentDirectives, + isReturning bool, + insertCtx drm.PreparedStatementCtx, + drmCfg drm.Config, ) (primitive.IPrimitive, error) { m := op // tableName, err := mv.tableMeta.GetTableName() @@ -1471,6 +1501,8 @@ func GetMonitorExecutor( elapsedSeconds: 0, pollIntervalSeconds: MonitorPollIntervalSeconds, comments: comments, + insertCtx: insertCtx, + drmCfg: drmCfg, } if comments != nil { asyncPrim.noStatus = comments.IsSet("NOSTATUS") @@ -1498,7 +1530,64 @@ func GetMonitorExecutor( operationDescriptor := getOpDescriptor(body) endTime, endTimeOk := body["endTime"] + prStr := provider.GetName() + //nolint:nestif // acceptable for now if endTimeOk && endTime != "" { + targetLink, targetLinkOK := body["targetLink"] + if targetLinkOK && isReturning { + authCtx, authErr := pc.GetAuthContext(prStr) + if authErr != nil { + return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr) + } + if authCtx == nil { + return internaldto.NewExecutorOutput(nil, nil, nil, nil, fmt.Errorf("cannot execute monitor: no auth context")) + } + targetLinkStr, targetLinkStrOk := targetLink.(string) + if !targetLinkStrOk { + return internaldto.NewExecutorOutput( + nil, + nil, + nil, + nil, + fmt.Errorf("cannot execute monitor: 'targetLink' is not a string"), + ) + } + httpResponse, httpResponseErr := shimProcessHTTP( + targetLinkStr, + rtCtx, + authCtx, + provider, + m, + outErrFile, + ) + if httpResponseErr != nil { + return internaldto.NewExecutorOutput(nil, nil, nil, nil, httpResponseErr) + } + + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + target, targetErr := m.DeprecatedProcessResponse(httpResponse) + handlerCtx.LogHTTPResponseMap(target) + if targetErr != nil { + return internaldto.NewExecutorOutput(nil, nil, nil, nil, targetErr) + } + // TODO: insert into table here + if isReturning { + if asyncPrim.insertCtx != nil { + _, rErr := asyncPrim.drmCfg.ExecuteInsertDML( + handlerCtx.GetSQLEngine(), + asyncPrim.insertCtx, + target, + "", // TODO: figure out how on earth to compute this encoding + ) + if rErr != nil { + return internaldto.NewExecutorOutput(nil, nil, nil, nil, rErr) + } + } + } + return prepareResultSet(&asyncPrim, pc, target, operationDescriptor) + } return prepareResultSet(&asyncPrim, pc, body, operationDescriptor) } url, ok := body["selfLink"] @@ -1511,7 +1600,6 @@ func GetMonitorExecutor( fmt.Errorf("cannot execute monitor: no 'selfLink' property present"), ) } - prStr := provider.GetName() authCtx, authErr := pc.GetAuthContext(prStr) if authErr != nil { return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr) @@ -1523,7 +1611,7 @@ func GetMonitorExecutor( asyncPrim.elapsedSeconds += asyncPrim.pollIntervalSeconds if !asyncPrim.noStatus { //nolint:errcheck //TODO: handle error - pc.GetWriter().Write( + pc.GetErrWriter().Write( []byte( fmt.Sprintf( "%s in progress, %d seconds elapsed", @@ -1649,6 +1737,8 @@ type asyncHTTPMonitorPrimitive struct { noStatus bool id int64 comments sqlparser.CommentDirectives + insertCtx drm.PreparedStatementCtx + drmCfg drm.Config } func (pr *asyncHTTPMonitorPrimitive) SetTxnID(_ int) { @@ -1762,7 +1852,7 @@ func prepareResultSet( } if !prim.noStatus { //nolint:errcheck //TODO: handle error - pc.GetWriter().Write([]byte(fmt.Sprintf("%s complete", operationDescriptor) + fmt.Sprintln(""))) + pc.GetErrWriter().Write([]byte(fmt.Sprintf("%s complete", operationDescriptor) + fmt.Sprintln(""))) } return util.PrepareResultSet(payload) } diff --git a/internal/stackql/internal_data_transfer/builder_input/builder_input.go b/internal/stackql/internal_data_transfer/builder_input/builder_input.go index 4fa6ee51..145cb9af 100644 --- a/internal/stackql/internal_data_transfer/builder_input/builder_input.go +++ b/internal/stackql/internal_data_transfer/builder_input/builder_input.go @@ -5,6 +5,7 @@ import ( "github.com/stackql/any-sdk/pkg/streaming" "github.com/stackql/stackql-parser/go/vt/sqlparser" "github.com/stackql/stackql/internal/stackql/astanalysis/annotatedast" + "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/handler" "github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto" "github.com/stackql/stackql/internal/stackql/primitivegraph" @@ -31,6 +32,8 @@ type BuilderInput interface { GetOperationStore() (anysdk.OperationStore, bool) SetOperationStore(op anysdk.OperationStore) IsAwait() bool + IsReturning() bool + SetIsReturning(bool) GetVerb() string GetInputAlias() string IsUndo() bool @@ -54,6 +57,8 @@ type BuilderInput interface { GetTxnCtrlCtrs() (internaldto.TxnControlCounters, bool) GetTableInsertionContainer() (tableinsertioncontainer.TableInsertionContainer, bool) SetTableInsertionContainer(tableinsertioncontainer.TableInsertionContainer) + SetInsertCtx(insertCtx drm.PreparedStatementCtx) + GetInsertCtx() (drm.PreparedStatementCtx, bool) } type builderInput struct { @@ -64,6 +69,7 @@ type builderInput struct { dependencyNode primitivegraph.PrimitiveNode commentDirectives sqlparser.CommentDirectives isAwait bool + isReturning bool verb string inputAlias string isUndo bool @@ -76,6 +82,7 @@ type builderInput struct { isTargetPhysical bool txnCtrlCtrs internaldto.TxnControlCounters tableInsertionContainer tableinsertioncontainer.TableInsertionContainer + insertCtx drm.PreparedStatementCtx } func NewBuilderInput( @@ -92,6 +99,25 @@ func NewBuilderInput( } } +func (bi *builderInput) SetInsertCtx(insertCtx drm.PreparedStatementCtx) { + bi.insertCtx = insertCtx +} + +func (bi *builderInput) GetInsertCtx() (drm.PreparedStatementCtx, bool) { + if bi.insertCtx == nil { + return nil, false + } + return bi.insertCtx, true +} + +func (bi *builderInput) IsReturning() bool { + return bi.isReturning +} + +func (bi *builderInput) SetIsReturning(isReturning bool) { + bi.isReturning = isReturning +} + func (bi *builderInput) GetTableInsertionContainer() (tableinsertioncontainer.TableInsertionContainer, bool) { return bi.tableInsertionContainer, bi.tableInsertionContainer != nil } @@ -260,5 +286,7 @@ func (bi *builderInput) Clone() BuilderInput { isTargetPhysical: bi.isTargetPhysical, annotatedAst: bi.annotatedAst, txnCtrlCtrs: bi.txnCtrlCtrs, + isReturning: bi.isReturning, + insertCtx: bi.insertCtx, } } diff --git a/internal/stackql/planbuilder/plan_builder.go b/internal/stackql/planbuilder/plan_builder.go index e9dfb79d..9ca38730 100644 --- a/internal/stackql/planbuilder/plan_builder.go +++ b/internal/stackql/planbuilder/plan_builder.go @@ -10,6 +10,7 @@ import ( "github.com/stackql/any-sdk/anysdk" "github.com/stackql/any-sdk/pkg/logging" + "github.com/stackql/any-sdk/pkg/streaming" "github.com/stackql/stackql/internal/stackql/acid/txn_context" "github.com/stackql/stackql/internal/stackql/astanalysis/routeanalysis" "github.com/stackql/stackql/internal/stackql/handler" @@ -912,7 +913,8 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild ) bldrInput.SetDependencyNode(selectPrimitiveNode) bldrInput.SetCommentDirectives(primitiveGenerator.GetPrimitiveComposer().GetCommentDirectives()) - bldrInput.SetIsAwait(primitiveGenerator.GetPrimitiveComposer().IsAwait()) + isAwait := primitiveGenerator.GetPrimitiveComposer().IsAwait() + bldrInput.SetIsAwait(isAwait) bldrInput.SetParserNode(node) bldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST()) bldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs()) @@ -925,7 +927,6 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild // Two cases: // 1. Synchronous. Equivalent to select. // 2. Asynchronous. Whole other story. - // Synchronous only for now... tableMeta, tableMetaExists := bldrInput.GetTableMetadata() if !tableMetaExists { return fmt.Errorf("could not obtain table metadata for node '%s'", node.Action) @@ -939,12 +940,44 @@ func (pgb *standardPlanGraphBuilder) handleInsert(pbi planbuilderinput.PlanBuild return rcErr } bldrInput.SetTableInsertionContainer(rc) - bldr = primitivebuilder.NewSingleAcquireAndSelect( - bldrInput, - primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(), - primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(), - nil, - ) + bldrInput.SetIsReturning(true) + if !isAwait { + bldr = primitivebuilder.NewSingleAcquireAndSelect( + bldrInput, + primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx(), + primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(), + nil, + ) + } else { + bldrInput.SetIsAwait(true) + bldrInput.SetIsReturning(true) + bldrInput.SetInsertCtx(primitiveGenerator.GetPrimitiveComposer().GetInsertPreparedStatementCtx()) + lhsBldr := primitivebuilder.NewInsertOrUpdate( + bldrInput, + ) + newBldrInput := builder_input.NewBuilderInput( + pgb.planGraphHolder, + handlerCtx, + tbl, + ) + newBldrInput.SetParserNode(node) + newBldrInput.SetAnnotatedAST(pbi.GetAnnotatedAST()) + newBldrInput.SetTxnCtrlCtrs(pbi.GetTxnCtrlCtrs()) + newBldrInput.SetTableInsertionContainer(rc) + newBldrInput.SetDependencyNode(selectPrimitiveNode) + newBldrInput.SetIsAwait(isAwait) + rhsBldr := primitivebuilder.NewSingleSelect( + pgb.planGraphHolder, handlerCtx, primitiveGenerator.GetPrimitiveComposer().GetSelectPreparedStatementCtx(), + []tableinsertioncontainer.TableInsertionContainer{rc}, + nil, + streaming.NewNopMapStream(), + ) + bldr = primitivebuilder.NewDependencySubDAGBuilder( + pgb.planGraphHolder, + []primitivebuilder.Builder{lhsBldr}, + rhsBldr, + ) + } } else { bldr = primitivebuilder.NewInsertOrUpdate( bldrInput, diff --git a/internal/stackql/primitivebuilder/delete.go b/internal/stackql/primitivebuilder/delete.go index 212141b9..62fea2bb 100644 --- a/internal/stackql/primitivebuilder/delete.go +++ b/internal/stackql/primitivebuilder/delete.go @@ -3,6 +3,7 @@ package primitivebuilder import ( "github.com/stackql/any-sdk/pkg/streaming" "github.com/stackql/stackql-parser/go/vt/sqlparser" + "github.com/stackql/stackql/internal/stackql/asynccompose" "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/execution" "github.com/stackql/stackql/internal/stackql/handler" @@ -93,7 +94,8 @@ func (ss *Delete) Build() error { primitive_context.NewPrimitiveContext(), ) if ss.isAwait { - deletePrimitive, err = composeAsyncMonitor(handlerCtx, deletePrimitive, prov, method, nil) + deletePrimitive, err = asynccompose.ComposeAsyncMonitor( + handlerCtx, deletePrimitive, prov, method, nil, false, nil, nil) // isReturning hardcoded to false for now } if err != nil { return err diff --git a/internal/stackql/primitivebuilder/exec.go b/internal/stackql/primitivebuilder/exec.go index a8698543..f8919982 100644 --- a/internal/stackql/primitivebuilder/exec.go +++ b/internal/stackql/primitivebuilder/exec.go @@ -3,6 +3,7 @@ package primitivebuilder import ( "github.com/stackql/any-sdk/anysdk" "github.com/stackql/stackql-parser/go/vt/sqlparser" + "github.com/stackql/stackql/internal/stackql/asynccompose" "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/execution" "github.com/stackql/stackql/internal/stackql/handler" @@ -174,7 +175,9 @@ func (ss *Exec) Build() error { ss.graph.CreatePrimitiveNode(execPrimitive) return nil } - pr, err := composeAsyncMonitor(handlerCtx, execPrimitive, prov, m, nil) + pr, err := asynccompose.ComposeAsyncMonitor( + handlerCtx, execPrimitive, prov, m, + nil, false, nil, nil) // returning hardcoded to false for now if err != nil { return err } diff --git a/internal/stackql/primitivebuilder/generic_http_reversal.go b/internal/stackql/primitivebuilder/generic_http_reversal.go index 04dd737c..05b84224 100644 --- a/internal/stackql/primitivebuilder/generic_http_reversal.go +++ b/internal/stackql/primitivebuilder/generic_http_reversal.go @@ -6,6 +6,7 @@ import ( "github.com/stackql/any-sdk/anysdk" "github.com/stackql/stackql-parser/go/vt/sqlparser" "github.com/stackql/stackql/internal/stackql/acid/binlog" + "github.com/stackql/stackql/internal/stackql/asynccompose" "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/execution" "github.com/stackql/stackql/internal/stackql/handler" @@ -21,10 +22,12 @@ type genericHTTPReversal struct { graphHolder primitivegraph.PrimitiveGraphHolder handlerCtx handler.HandlerContext drmCfg drm.Config + insertCtx drm.PreparedStatementCtx root primitivegraph.PrimitiveNode op anysdk.OperationStore commentDirectives sqlparser.CommentDirectives isAwait bool + isReturning bool verb string // may be "insert" or "update" inputAlias string isUndo bool @@ -56,7 +59,7 @@ func newGenericHTTPReversal( return nil, fmt.Errorf("provider is required") } - return &genericHTTPReversal{ + rv := &genericHTTPReversal{ prov: prov, graphHolder: graphHolder, handlerCtx: handlerCtx, @@ -67,7 +70,12 @@ func newGenericHTTPReversal( verb: builderInput.GetVerb(), inputAlias: builderInput.GetInputAlias(), isUndo: builderInput.IsUndo(), - }, nil + } + insertCtx, insertCtxExists := builderInput.GetInsertCtx() + if insertCtxExists { + rv.insertCtx = insertCtx + } + return rv, nil } func (gh *genericHTTPReversal) GetRoot() primitivegraph.PrimitiveNode { @@ -206,7 +214,8 @@ func (gh *genericHTTPReversal) Build() error { if err != nil { return internaldto.NewErroneousExecutorOutput(err) } - execPrim, execErr := composeAsyncMonitor(handlerCtx, dependentInsertPrimitive, prov, m, commentDirectives) + execPrim, execErr := asynccompose.ComposeAsyncMonitor( + handlerCtx, dependentInsertPrimitive, prov, m, commentDirectives, gh.isReturning, gh.insertCtx, gh.drmCfg) if execErr != nil { return internaldto.NewErroneousExecutorOutput(execErr) } diff --git a/internal/stackql/primitivebuilder/generic_http_stream_input.go b/internal/stackql/primitivebuilder/generic_http_stream_input.go index 5bb42525..87f95e22 100644 --- a/internal/stackql/primitivebuilder/generic_http_stream_input.go +++ b/internal/stackql/primitivebuilder/generic_http_stream_input.go @@ -10,6 +10,7 @@ import ( "github.com/stackql/any-sdk/pkg/logging" "github.com/stackql/stackql-parser/go/vt/sqlparser" "github.com/stackql/stackql/internal/stackql/acid/binlog" + "github.com/stackql/stackql/internal/stackql/asynccompose" "github.com/stackql/stackql/internal/stackql/drm" "github.com/stackql/stackql/internal/stackql/execution" "github.com/stackql/stackql/internal/stackql/handler" @@ -27,11 +28,13 @@ type genericHTTPStreamInput struct { handlerCtx handler.HandlerContext drmCfg drm.Config root primitivegraph.PrimitiveNode + tail primitivegraph.PrimitiveNode tbl tablemetadata.ExtendedTableMetadata commentDirectives sqlparser.CommentDirectives dependencyNode primitivegraph.PrimitiveNode parserNode sqlparser.SQLNode isAwait bool + isReturning bool verb string // may be "insert" or "update" inputAlias string isUndo bool @@ -39,6 +42,7 @@ type genericHTTPStreamInput struct { reversalStream anysdk.HttpPreparatorStream reversalBuilder Builder rollbackType constants.RollbackType + insertCtx drm.PreparedStatementCtx } func newGenericHTTPStreamInput( @@ -62,6 +66,7 @@ func newGenericHTTPStreamInput( return nil, fmt.Errorf("dependency node is required") } parserNode, _ := builderInput.GetParserNode() + insertCtx, _ := builderInput.GetInsertCtx() return &genericHTTPStreamInput{ graphHolder: graphHolder, handlerCtx: handlerCtx, @@ -70,12 +75,14 @@ func newGenericHTTPStreamInput( commentDirectives: commentDirectives, dependencyNode: dependencyNode, isAwait: builderInput.IsAwait(), + isReturning: builderInput.IsReturning(), verb: builderInput.GetVerb(), inputAlias: builderInput.GetInputAlias(), isUndo: builderInput.IsUndo(), parserNode: parserNode, reversalStream: anysdk.NewHttpPreparatorStream(), rollbackType: handlerCtx.GetRollbackType(), + insertCtx: insertCtx, }, nil } @@ -88,6 +95,9 @@ func (gh *genericHTTPStreamInput) GetRoot() primitivegraph.PrimitiveNode { } func (gh *genericHTTPStreamInput) GetTail() primitivegraph.PrimitiveNode { + if gh.tail != nil { + return gh.tail + } return gh.root } @@ -179,6 +189,7 @@ func (gh *genericHTTPStreamInput) Build() error { reverseInput.SetHTTPPreparatorStream(gh.reversalStream) reverseInput.SetOperationStore(inverseOpStore) reverseInput.SetProvider(prov) + reverseInput.SetInsertCtx(gh.insertCtx) gh.reversalBuilder, reversalBuildInitErr = newGenericHTTPReversal(reverseInput) if reversalBuildInitErr != nil { return reversalBuildInitErr @@ -357,7 +368,8 @@ func (gh *genericHTTPStreamInput) Build() error { if err != nil { return internaldto.NewErroneousExecutorOutput(err) } - execPrim, execErr := composeAsyncMonitor(handlerCtx, dependentInsertPrimitive, prov, m, commentDirectives) + execPrim, execErr := asynccompose.ComposeAsyncMonitor( + handlerCtx, dependentInsertPrimitive, prov, m, commentDirectives, gh.isReturning, gh.insertCtx, gh.drmCfg) if execErr != nil { return internaldto.NewErroneousExecutorOutput(execErr) } @@ -386,6 +398,7 @@ func (gh *genericHTTPStreamInput) Build() error { actionNode := graphHolder.CreatePrimitiveNode(actionPrimitive) graphHolder.NewDependency(gh.dependencyNode, actionNode, 1.0) gh.root = gh.dependencyNode + gh.tail = actionNode return nil } diff --git a/internal/stackql/primitivebuilder/insert_or_update.go b/internal/stackql/primitivebuilder/insert_or_update.go index b90bbd6d..ba7e9510 100644 --- a/internal/stackql/primitivebuilder/insert_or_update.go +++ b/internal/stackql/primitivebuilder/insert_or_update.go @@ -11,6 +11,7 @@ import ( type insertOrUpdate struct { bldrInput builder_input.BuilderInput root primitivegraph.PrimitiveNode + tail primitivegraph.PrimitiveNode } func NewInsertOrUpdate( @@ -26,7 +27,7 @@ func (ss *insertOrUpdate) GetRoot() primitivegraph.PrimitiveNode { } func (ss *insertOrUpdate) GetTail() primitivegraph.PrimitiveNode { - return ss.root + return ss.tail } func (ss *insertOrUpdate) Build() error { @@ -38,6 +39,9 @@ func (ss *insertOrUpdate) Build() error { switch node := node.(type) { case *sqlparser.Insert: mutableInput.SetVerb("insert") + if len(node.SelectExprs) > 0 { + mutableInput.SetIsReturning(true) + } case *sqlparser.Update: mutableInput.SetVerb("update") default: @@ -82,6 +86,7 @@ func (ss *insertOrUpdate) Build() error { return genericBldrErr } ss.root = genericBldr.GetRoot() + ss.tail = genericBldr.GetTail() return nil } diff --git a/internal/stackql/primitivebuilder/mono_valent_builder.go b/internal/stackql/primitivebuilder/mono_valent_builder.go index 57ccfa9b..4ee987c1 100644 --- a/internal/stackql/primitivebuilder/mono_valent_builder.go +++ b/internal/stackql/primitivebuilder/mono_valent_builder.go @@ -31,6 +31,7 @@ type monoValentBuilder struct { root primitivegraph.PrimitiveNode stream streaming.MapStream isReadOnly bool //nolint:unused // TODO: build out + isAwait bool //nolint:unused // TODO: build out monoValentExecutorFactory execution.MonoValentExecutorFactory } @@ -44,6 +45,7 @@ func newMonoValentBuilder( stream streaming.MapStream, isSkipResponse bool, isMutation bool, + isAwait bool, ) Builder { var tcc internaldto.TxnControlCounters if insertCtx != nil { @@ -72,7 +74,7 @@ func newMonoValentBuilder( stream, isSkipResponse, isMutation, - false, + isAwait, ), } } diff --git a/internal/stackql/primitivebuilder/single_acquire_and_select.go b/internal/stackql/primitivebuilder/single_acquire_and_select.go index 0935343a..90dca5ae 100644 --- a/internal/stackql/primitivebuilder/single_acquire_and_select.go +++ b/internal/stackql/primitivebuilder/single_acquire_and_select.go @@ -38,7 +38,9 @@ func NewSingleAcquireAndSelect( insertContainer, insertCtx, rowSort, - nil), + nil, + bldrInput.IsAwait(), + ), selectBuilder: NewSingleSelect( graph, handlerCtx, selectCtx, []tableinsertioncontainer.TableInsertionContainer{insertContainer}, diff --git a/internal/stackql/primitivebuilder/single_select_acquire.go b/internal/stackql/primitivebuilder/single_select_acquire.go index 2df924c4..b82e8af5 100644 --- a/internal/stackql/primitivebuilder/single_select_acquire.go +++ b/internal/stackql/primitivebuilder/single_select_acquire.go @@ -15,6 +15,7 @@ func NewSingleSelectAcquire( insertCtx drm.PreparedStatementCtx, rowSort func(map[string]map[string]interface{}) []string, stream streaming.MapStream, + isAwait bool, ) Builder { tableMeta := insertionContainer.GetTableMetadata() _, isGraphQL := tableMeta.GetGraphQL() @@ -39,5 +40,6 @@ func NewSingleSelectAcquire( stream, false, false, + isAwait, ) } diff --git a/internal/stackql/primitivegenerator/statement_analyzer.go b/internal/stackql/primitivegenerator/statement_analyzer.go index b9e0617d..f2e62cb0 100644 --- a/internal/stackql/primitivegenerator/statement_analyzer.go +++ b/internal/stackql/primitivegenerator/statement_analyzer.go @@ -695,7 +695,7 @@ func (pb *standardPrimitiveGenerator) analyzeExec(pbi planbuilderinput.PlanBuild handlerCtx, insertionContainer, pb.PrimitiveComposer.GetInsertPreparedStatementCtx(), - nil, nil)) + nil, nil, false)) // returning hardcoded to false for now return nil } selIndirect, indirectErr := astindirect.NewParserExecIndirect( @@ -1125,6 +1125,7 @@ func (pb *standardPrimitiveGenerator) AnalyzeInsert(pbi planbuilderinput.PlanBui []anysdk.ColumnDescriptor{}, ) analyser := anysdk.NewMethodAnalyzer() + // TODO: this ought to cater for async methodAnalysisOutput, analysisErr := analyser.AnalyzeUnaryAction(analysisInput) if analysisErr != nil { return analysisErr diff --git a/internal/test/stackqltestutil/helper.go b/internal/test/stackqltestutil/helper.go index e71eb9b9..b3128568 100644 --- a/internal/test/stackqltestutil/helper.go +++ b/internal/test/stackqltestutil/helper.go @@ -33,6 +33,28 @@ func RunStdOutTestAgainstFiles(t *testing.T, testSubject func(*testing.T), possi checkPossibleMatchFiles(t, out, possibleExpectedOutputFiles) } +func RunStdErrTestAgainstFiles(t *testing.T, testSubject func(*testing.T), possibleExpectedOutputFiles []string) { + old := os.Stderr // keep backup of the real stderr + r, w, _ := os.Pipe() + os.Stderr = w + outC := make(chan string) + + testSubject(t) + + // copy the output in a separate goroutine so printing can't block indefinitely + go func() { + var buf bytes.Buffer + io.Copy(&buf, r) //nolint:errcheck // ok for testing + outC <- buf.String() + }() + w.Close() + os.Stderr = old // restoring the real stderr + out := <-outC + t.Logf("outC = %s", out) + + checkPossibleMatchFiles(t, out, possibleExpectedOutputFiles) +} + func checkPossibleMatchFiles(t *testing.T, subject string, possibleExpectedOutputFiles []string) { hasMatchedExpected := false for _, expectedOpFile := range possibleExpectedOutputFiles { diff --git a/stackql/main_integration_test.go b/stackql/main_integration_test.go index c0640800..70c20c65 100644 --- a/stackql/main_integration_test.go +++ b/stackql/main_integration_test.go @@ -79,7 +79,7 @@ func TestK8STemplatedE2eSuccess(t *testing.T) { os.Args = args - stackqltestutil.RunStdOutTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedK8STheHardWayAsyncFile}) + stackqltestutil.RunStdErrTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedK8STheHardWayAsyncFile}) } func TestInsertAwaitExecSuccess(t *testing.T) { @@ -105,7 +105,7 @@ func TestInsertAwaitExecSuccess(t *testing.T) { os.Args = args - stackqltestutil.RunStdOutTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkInsertAsyncFile}) + stackqltestutil.RunStdErrTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkInsertAsyncFile}) } func TestDeleteAwaitSuccess(t *testing.T) { @@ -130,7 +130,7 @@ func TestDeleteAwaitSuccess(t *testing.T) { os.Args = args - stackqltestutil.RunStdOutTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkDeleteAsyncFile}) + stackqltestutil.RunStdErrTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkDeleteAsyncFile}) } func TestDeleteAwaitExecSuccess(t *testing.T) { @@ -156,7 +156,7 @@ func TestDeleteAwaitExecSuccess(t *testing.T) { os.Args = args - stackqltestutil.RunStdOutTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkDeleteAsyncFile}) + stackqltestutil.RunStdErrTestAgainstFiles(t, execStuff, []string{testobjects.ExpectedComputeNetworkDeleteAsyncFile}) } func execStuff(t *testing.T) { diff --git a/test/python/stackql_test_tooling/flask/gcp/app.py b/test/python/stackql_test_tooling/flask/gcp/app.py index 708bb8cd..d703d833 100644 --- a/test/python/stackql_test_tooling/flask/gcp/app.py +++ b/test/python/stackql_test_tooling/flask/gcp/app.py @@ -2,8 +2,12 @@ import logging from flask import Flask, render_template, request, jsonify +import os + app = Flask(__name__) +_IS_DOCKER = True if os.getenv('IS_DOCKER', 'false').lower() == 'true' else False + # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) @@ -32,6 +36,66 @@ def v1_storage_buckets_insert(): return render_template('buckets-insert-generic.jinja.json', bucket_name=bucket_name), 200, {'Content-Type': 'application/json'} return '{"msg": "Disallowed"}', 401, {'Content-Type': 'application/json'} +@app.route('/compute/v1/projects/testing-project/global/networks', methods=['GET']) +def projects_testing_project_global_networks(): + return render_template('route_27_template.json'), 200, {'Content-Type': 'application/json'} + +@app.route('/compute/v1/projects//global/networks', methods=['POST']) +def compute_networks_insert(project_name: str): + # Validate the incoming query + body = request.get_json() + operation_id = '1000000000001' + operation_name = 'operation-100000000001-10000000001-10000001-10000001' + network_name = body['name'] + host_name = 'host.docker.internal' if _IS_DOCKER else 'localhost' + target_link = f'https://{host_name}:1080/compute/v1/projects/{ project_name }/global/networks/{ network_name }' + if not body or 'name' not in body: + return '{"msg": "Invalid request body"}', 400, {'Content-Type': 'application/json'} + if not project_name: + return '{"msg": "Invalid request: project not supplied"}', 400, {'Content-Type': 'application/json'} + if project_name == 'mutable-project' and network_name == 'auto-test-01': + return render_template( + 'global-operation.jinja.json', + target_link=target_link, + operation_id=operation_id, + operation_name=operation_name, + project_name=project_name, + host_name=host_name, + kind='compute#operation', + operation_type='insert', + progress=0, + ), 200, {'Content-Type': 'application/json'} + return '{"msg": "Disallowed"}', 401, {'Content-Type': 'application/json'} + +@app.route('/compute/v1/projects//global/operations/', methods=['GET']) +def projects_testing_project_global_operation_detail(project_name: str, operation_name: str): + if project_name == 'mutable-project' and 'operation-100000000001-10000000001-10000001-10000001': + operation_id = '1000000000001' + network_name = 'auto-test-01' + host_name = 'host.docker.internal' if _IS_DOCKER else 'localhost' + target_link = f'https://{host_name}:1080/compute/v1/projects/{ project_name }/global/networks/{ network_name }' + return render_template( + 'global-operation.jinja.json', + target_link=target_link, + operation_id=operation_id, + operation_name=operation_name, + project_name=project_name, + host_name=host_name, + kind='compute#operation', + operation_type='insert', + progress=100, + end_time='2025-07-05T19:43:34.491-07:00', + ), 200, {'Content-Type': 'application/json'} + return '{"msg": "Disallowed"}', 401, {'Content-Type': 'application/json'} + +@app.route('/compute/v1/projects//global/networks/', methods=['GET']) +def projects_testing_project_global_network_detail(project_name: str, network_name: str): + return render_template( + 'networks-insert-generic-mature.jinja.json', + project_name=project_name, + network_name=network_name + ), 200, {'Content-Type': 'application/json'} + @app.route('/v1/projects/testing-project-three/locations/global/keyRings/testing-three/cryptoKeys', methods=['GET']) def v1_projects_testing_project_three_locations_global_keyRings_testing_three_cryptoKeys(): return render_template('route_1_template.json'), 200, {'Content-Type': 'application/json'} @@ -188,9 +252,6 @@ def projects_testing_project_zones_australia_southeast1_a_disks(): def projects_testing_project_zones_australia_southeast1_b_disks(): return render_template('route_26_template.json'), 200, {'Content-Type': 'application/json'} -@app.route('/compute/v1/projects/testing-project/global/networks', methods=['GET']) -def projects_testing_project_global_networks(): - return render_template('route_27_template.json'), 200, {'Content-Type': 'application/json'} @app.route('/compute/v1/projects/testing-project/regions/australia-southeast1/subnetworks', methods=['GET']) def projects_testing_project_regions_australia_southeast1_subnetworks(): diff --git a/test/python/stackql_test_tooling/flask/gcp/templates/global-operation.jinja.json b/test/python/stackql_test_tooling/flask/gcp/templates/global-operation.jinja.json new file mode 100644 index 00000000..143f6c52 --- /dev/null +++ b/test/python/stackql_test_tooling/flask/gcp/templates/global-operation.jinja.json @@ -0,0 +1,15 @@ +{ + "kind": "{{ kind if kind else 'compute#operation' }}", + "id": "{{ operation_id if operation_id else '1000000000001' }}", + "name": "{{ operation_name }}", + "operationType": "insert", + "targetLink": "{{ target_link }}", + "targetId": "{{ target_id if target_id else '2000000000002' }}", + "status": "{{ 'DONE' if progress and progress > 99 else 'RUNNING' }}", + "user": "krimmer@ryukit.com", + "progress": {{ progress }}, + "insertTime": "2025-07-05T19:42:34.488-07:00", + "startTime": "2025-07-05T19:42:34.491-07:00", + {% if end_time %}"endTime": "{{ end_time }}",{% endif %} + "selfLink": "https://{{ host_name }}:1080/compute/v1/projects/{{ project_name }}/global/operations/{{ operation_name }}" +} \ No newline at end of file diff --git a/test/python/stackql_test_tooling/flask/gcp/templates/networks-insert-generic-mature.jinja.json b/test/python/stackql_test_tooling/flask/gcp/templates/networks-insert-generic-mature.jinja.json new file mode 100644 index 00000000..e95d0453 --- /dev/null +++ b/test/python/stackql_test_tooling/flask/gcp/templates/networks-insert-generic-mature.jinja.json @@ -0,0 +1,14 @@ +{ + "kind": "compute#network", + "id": "{{ network_id if network_id else '1000000000000001' }}", + "creationTimestamp": "2025-07-05T19:42:34.483-07:00", + "name": "{{ network_name }}", + "selfLink": "https://www.googleapis.com/compute/v1/projects/{{ project_name }}/global/networks/{{ network_name }}", + "selfLinkWithId": "https://www.googleapis.com/compute/v1/projects/{{ project_name }}/global/networks/{{ network_id if network_id else '1000000000000001' }}", + "autoCreateSubnetworks": false, + "routingConfig": { + "routingMode": "REGIONAL", + "bgpBestPathSelectionMode": "LEGACY" + }, + "networkFirewallPolicyEnforcementOrder": "AFTER_CLASSIC_FIREWALL" +} diff --git a/test/registry/src/googleapis.com/v0.1.2/resources/compute-v1.yaml b/test/registry/src/googleapis.com/v0.1.2/resources/compute-v1.yaml index 9adeed6c..74377774 100644 --- a/test/registry/src/googleapis.com/v0.1.2/resources/compute-v1.yaml +++ b/test/registry/src/googleapis.com/v0.1.2/resources/compute-v1.yaml @@ -2130,6 +2130,9 @@ resources: response: mediaType: application/json openAPIDocKey: '200' + asyncOverrideMediaType: application/json + async_schema_override: + $ref: '#/components/schemas/Network' list: operation: $ref: 'googleapis.com/v0.1.2/services-split/compute/compute-v1.yaml#/paths/~1projects~1{project}~1global~1networks/get' diff --git a/test/robot/functional/stackql_mocked_from_cmd_line.robot b/test/robot/functional/stackql_mocked_from_cmd_line.robot index 0a22b913..16b49eaf 100644 --- a/test/robot/functional/stackql_mocked_from_cmd_line.robot +++ b/test/robot/functional/stackql_mocked_from_cmd_line.robot @@ -8550,3 +8550,30 @@ Insert Returning Simple Projection ... ${EMPTY} ... stdout=${CURDIR}/tmp/Insert-Returning-Simple-Projection.tmp ... stderr=${CURDIR}/tmp/Insert-Returning-Simple-Projection-stderr.tmp + +Insert Async Returning Simple Projection + [Documentation] Insert a row into a table and return projected new object values. For **asynchronously** created objects. + ${inputStr} = Catenate + ... insert /*+ AWAIT */ into google.compute.networks(project, data__name, data__autoCreateSubnetworks) select 'mutable-project', 'auto-test-01', false returning creationTimestamp, name; + ${outputStr} = Catenate SEPARATOR=\n + ... |-------------------------------|--------------| + ... |${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}creationTimestamp${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}|${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}name${SPACE}${SPACE}${SPACE}${SPACE}${SPACE}| + ... |-------------------------------|--------------| + ... |${SPACE}2025-07-05T19:42:34.483-07:00${SPACE}|${SPACE}auto-test-01${SPACE}| + ... |-------------------------------|--------------| + ${stdErrStr} = Catenate SEPARATOR=\n + ... compute#operation: insert in progress, 10 seconds elapsed + ... compute#operation: insert complete + Should Stackql Exec Inline Equal Both Streams + ... ${STACKQL_EXE} + ... ${OKTA_SECRET_STR} + ... ${GITHUB_SECRET_STR} + ... ${K8S_SECRET_STR} + ... ${REGISTRY_NO_VERIFY_CFG_STR} + ... ${AUTH_CFG_STR} + ... ${SQL_BACKEND_CFG_STR_CANONICAL} + ... ${inputStr} + ... ${outputStr} + ... ${stdErrStr} + ... stdout=${CURDIR}/tmp/Insert-Async-Returning-Simple-Projection.tmp + ... stderr=${CURDIR}/tmp/Insert-Async-Returning-Simple-Projection-stderr.tmp