Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,8 @@ jobs:


dockertest:
env:
IS_DOCKER: 'true'
name: Docker Test
needs:
- dockerbuild
Expand Down
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
"select rings.projectsId as project, rings.locationsId as locale, split_part(rings.name, '/', -1) as key_ring_name, split_part(keys.name, '/', -1) as key_name, json_extract(keys.\"versionTemplate\", '$.algorithm') as key_algorithm, json_extract(keys.\"versionTemplate\", '$.protectionLevel') as key_protection_level from google.cloudkms.key_rings rings inner join google.cloudkms.crypto_keys keys on keys.keyRingsId = split_part(rings.name, '/', -1) and keys.projectsId = rings.projectsId and keys.locationsId = rings.locationsId where rings.projectsId in ('testing-project', 'testing-project-two', 'testing-project-three') and rings.locationsId in ('global', 'australia-southeast1', 'australia-southeast2') order by project, locale, key_name ;",
"delete from aws.cloud_control.resources where region = 'ap-southeast-1' and data__TypeName = 'AWS::Logs::LogGroup' and data__Identifier = 'LogGroupResourceExampleThird' ;",
"insert into google.storage.buckets( project, data__name) select 'testing-project', 'silly-bucket' returning projectNumber;",
"insert /*+ AWAIT */ into google.compute.networks(project, data__name, data__autoCreateSubnetworks) select 'mutable-project', 'auto-test-01', false returning creationTimestamp, name;",
],
"default": "show providers;"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ https://docs.aws.amazon.com/sdk-for-go/api/aws/signer/v4/
`INSERT RETURNING` can function in two mechanisms:

- Synchronous responses, such as [`google.storage.buckets`](https://cloud.google.com/storage/docs/json_api/v1/buckets/insert). The returning clause is a projection on the immediately available reponse body.
- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded.
- Asynchronous responses, such as [`google.compute.instances`](https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert) and [`google.compute.networks`](https://cloud.google.com/compute/docs/reference/rest/v1/networks/insert). The returning clause is a projection on the reponse body **after** the await flow has concluded.

Future use cases for `UPDATE RETURNING`, `REPLACE RETURNING` and `DELETE RETURNING` will function the same observable fashion.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stackql/any-sdk v0.1.3-beta02
github.com/stackql/any-sdk v0.1.4-alpha06
github.com/stackql/go-suffix-map v0.0.1-alpha01
github.com/stackql/psql-wire v0.1.1-beta23
github.com/stackql/stackql-parser v0.0.15-alpha06
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk=
github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU=
github.com/stackql/any-sdk v0.1.3-beta02 h1:0jSwyYFddjAN++U+yNNLF7SMLPIIaRhv522UzeWDf2E=
github.com/stackql/any-sdk v0.1.3-beta02/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
github.com/stackql/any-sdk v0.1.4-alpha06 h1:QJPf3ehPrRqmYZR+TmD897AsmsaOamHErLhaE5B9v/w=
github.com/stackql/any-sdk v0.1.4-alpha06/go.mod h1:AKS/g28y7m4SWL/YW8veE9MCNy8XJgaicVibemVE9e8=
github.com/stackql/go-suffix-map v0.0.1-alpha01 h1:TDUDS8bySu41Oo9p0eniUeCm43mnRM6zFEd6j6VUaz8=
github.com/stackql/go-suffix-map v0.0.1-alpha01/go.mod h1:QAi+SKukOyf4dBtWy8UMy+hsXXV+yyEE4vmBkji2V7g=
github.com/stackql/psql-wire v0.1.1-beta23 h1:1ayYMjZArfDcIMyEOKnm+Bp1zRCISw8pguvTFuUhhVQ=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
package primitivebuilder
package asynccompose

import (
"github.com/stackql/any-sdk/anysdk"
"github.com/stackql/stackql-parser/go/vt/sqlparser"
"github.com/stackql/stackql/internal/stackql/drm"
"github.com/stackql/stackql/internal/stackql/handler"
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
"github.com/stackql/stackql/internal/stackql/primitive"
"github.com/stackql/stackql/internal/stackql/provider"
)

func composeAsyncMonitor(
func ComposeAsyncMonitor(
handlerCtx handler.HandlerContext,
precursor primitive.IPrimitive,
prov provider.IProvider,
method anysdk.OperationStore,
commentDirectives sqlparser.CommentDirectives,
isReturning bool,
insertCtx drm.PreparedStatementCtx,
drmCfg drm.Config,
) (primitive.IPrimitive, error) {
asm, err := NewAsyncMonitor(handlerCtx, prov, method)
asm, err := NewAsyncMonitor(handlerCtx, prov, method, isReturning)
if err != nil {
return nil, err
}
Expand All @@ -31,7 +35,8 @@ func composeAsyncMonitor(
handlerCtx.GetOutfile(),
handlerCtx.GetOutErrFile(),
)
primitive, err := asm.GetMonitorPrimitive(prov, method, precursor, pl, commentDirectives)
primitive, err := asm.GetMonitorPrimitive(
prov, method, precursor, pl, commentDirectives, isReturning, insertCtx, drmCfg)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package primitivebuilder
package asynccompose

import (
"fmt"
"strings"

"github.com/stackql/any-sdk/anysdk"
"github.com/stackql/stackql/internal/stackql/acid/binlog"
"github.com/stackql/stackql/internal/stackql/drm"
"github.com/stackql/stackql/internal/stackql/execution"
"github.com/stackql/stackql/internal/stackql/handler"
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
Expand All @@ -22,6 +23,9 @@ type IAsyncMonitor interface {
precursor primitive.IPrimitive,
initialCtx primitive.IPrimitiveCtx,
comments sqlparser.CommentDirectives,
isReturning bool,
insertCtx drm.PreparedStatementCtx,
drmCfg drm.Config,
) (primitive.IPrimitive, error)
}

Expand Down Expand Up @@ -120,11 +124,12 @@ func NewAsyncMonitor(
handlerCtx handler.HandlerContext,
prov provider.IProvider,
op anysdk.OperationStore,
isReturning bool,
) (IAsyncMonitor, error) {
//nolint:gocritic //TODO: refactor
switch prov.GetProviderString() {
case "google":
return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion())
return newGoogleAsyncMonitor(handlerCtx, prov, op, prov.GetVersion(), isReturning)
}
return nil, fmt.Errorf(
"async operation monitor for provider = '%s', api version = '%s' currently not supported",
Expand All @@ -136,6 +141,7 @@ func newGoogleAsyncMonitor(
prov provider.IProvider,
op anysdk.OperationStore,
version string, //nolint:unparam // TODO: refactor
isReturning bool, //nolint:unparam,revive // TODO: refactor
) (IAsyncMonitor, error) {
//nolint:gocritic //TODO: refactor
switch version {
Expand All @@ -160,11 +166,14 @@ func (gm *DefaultGoogleAsyncMonitor) GetMonitorPrimitive(
precursor primitive.IPrimitive,
initialCtx primitive.IPrimitiveCtx,
comments sqlparser.CommentDirectives,
isReturning bool,
insertCtx drm.PreparedStatementCtx,
drmCfg drm.Config,
) (primitive.IPrimitive, error) {
//nolint:gocritic,staticcheck //TODO: refactor
switch strings.ToLower(prov.GetVersion()) {
default:
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments)
return gm.getV1Monitor(prov, op, precursor, initialCtx, comments, isReturning, insertCtx, drmCfg)
}
}

Expand All @@ -174,6 +183,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
precursor primitive.IPrimitive,
initialCtx primitive.IPrimitiveCtx,
comments sqlparser.CommentDirectives,
isReturning bool,
insertCtx drm.PreparedStatementCtx,
drmCfg drm.Config,
) (primitive.IPrimitive, error) {
provider, providerErr := prov.GetProvider()
if providerErr != nil {
Expand All @@ -186,6 +198,9 @@ func (gm *DefaultGoogleAsyncMonitor) getV1Monitor(
precursor,
initialCtx,
comments,
isReturning,
insertCtx,
drmCfg,
)
if exPrepErr != nil {
return nil, exPrepErr
Expand Down
1 change: 1 addition & 0 deletions internal/stackql/dependencyplanner/dependencyplanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (dp *standardDependencyPlanner) orchestrate(
insPsc,
nil,
outStream,
false, // returning hardcoded to false for now
)
}
dp.execSlice = append(dp.execSlice, builder)
Expand Down
4 changes: 2 additions & 2 deletions internal/stackql/driver/dependent_simple_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsync(t *testing.T) {
if err != nil {
t.Fatalf("Test failed: %v", err)
}
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
if err != nil {
t.Fatalf("Test failed: %v", err)
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestSimpleInsertDependentGoogleComputeDiskAsyncReversed(t *testing.T) {
if err != nil {
t.Fatalf("Test failed: %v", err)
}
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, rdr, lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
if err != nil {
t.Fatalf("Test failed: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/stackql/driver/driver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestSimpleInsertGoogleComputeNetworkAsync(t *testing.T) {
}

testSubject := func(t *testing.T, outFile *bufio.Writer) {
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
if err != nil {
t.Fatalf("Test failed: %v", err)
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestK8sTheHardWayAsync(t *testing.T) {
runtimeCtx.InfilePath = k8sthwRenderedFile
runtimeCtx.CSVHeadersDisable = true

handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdOut(outFile), true)
handlerCtx, err := entryutil.BuildHandlerContext(*runtimeCtx, strings.NewReader(""), lrucache.NewLRUCache(int64(runtimeCtx.QueryCacheSize)), inputBundle.WithStdErr(outFile), true)
if err != nil {
t.Fatalf("Test failed: %v", err)
}
Expand Down
96 changes: 93 additions & 3 deletions internal/stackql/execution/mono_valent_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,7 @@ func (sp *standardProcessor) Process() ProcessorResponse {
if httpResponseErr != nil {
return newHTTPProcessorResponse(nil, reversalStream, false, httpResponseErr)
}
// TODO: add async monitor here
processed, resErr := method.ProcessResponse(httpResponse)
if resErr != nil {
if isSkipResponse && isMutation && httpResponse.StatusCode < 300 {
Expand Down Expand Up @@ -1444,6 +1445,32 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
return ex, nil
}

func shimProcessHTTP(
url string,
rtCtx dto.RuntimeCtx,
authCtx *dto.AuthCtx,
provider anysdk.Provider,
m anysdk.OperationStore,
outErrFile io.Writer,
) (*http.Response, error) {
req, monitorReqErr := anysdk.GetMonitorRequest(url)
if monitorReqErr != nil {
return nil, monitorReqErr
}
cc := anysdk.NewAnySdkClientConfigurator(rtCtx, provider.GetName())
anySdkResponse, apiErr := anysdk.CallFromSignature(
cc, rtCtx, authCtx, authCtx.Type, false, outErrFile, provider, anysdk.NewAnySdkOpStoreDesignation(m), req)

if apiErr != nil {
return nil, apiErr
}
httpResponse, httpResponseErr := anySdkResponse.GetHttpResponse()
if httpResponseErr != nil {
return nil, httpResponseErr
}
return httpResponse, nil
}

//nolint:funlen,gocognit // acceptable for now
func GetMonitorExecutor(
handlerCtx handler.HandlerContext,
Expand All @@ -1452,6 +1479,9 @@ func GetMonitorExecutor(
precursor primitive.IPrimitive,
initialCtx primitive.IPrimitiveCtx,
comments sqlparser.CommentDirectives,
isReturning bool,
insertCtx drm.PreparedStatementCtx,
drmCfg drm.Config,
) (primitive.IPrimitive, error) {
m := op
// tableName, err := mv.tableMeta.GetTableName()
Expand All @@ -1471,6 +1501,8 @@ func GetMonitorExecutor(
elapsedSeconds: 0,
pollIntervalSeconds: MonitorPollIntervalSeconds,
comments: comments,
insertCtx: insertCtx,
drmCfg: drmCfg,
}
if comments != nil {
asyncPrim.noStatus = comments.IsSet("NOSTATUS")
Expand Down Expand Up @@ -1498,7 +1530,64 @@ func GetMonitorExecutor(

operationDescriptor := getOpDescriptor(body)
endTime, endTimeOk := body["endTime"]
prStr := provider.GetName()
//nolint:nestif // acceptable for now
if endTimeOk && endTime != "" {
targetLink, targetLinkOK := body["targetLink"]
if targetLinkOK && isReturning {
authCtx, authErr := pc.GetAuthContext(prStr)
if authErr != nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr)
}
if authCtx == nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, fmt.Errorf("cannot execute monitor: no auth context"))
}
targetLinkStr, targetLinkStrOk := targetLink.(string)
if !targetLinkStrOk {
return internaldto.NewExecutorOutput(
nil,
nil,
nil,
nil,
fmt.Errorf("cannot execute monitor: 'targetLink' is not a string"),
)
}
httpResponse, httpResponseErr := shimProcessHTTP(
targetLinkStr,
rtCtx,
authCtx,
provider,
m,
outErrFile,
)
if httpResponseErr != nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, httpResponseErr)
}

if httpResponse != nil && httpResponse.Body != nil {
defer httpResponse.Body.Close()
}
target, targetErr := m.DeprecatedProcessResponse(httpResponse)
handlerCtx.LogHTTPResponseMap(target)
if targetErr != nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, targetErr)
}
// TODO: insert into table here
if isReturning {
if asyncPrim.insertCtx != nil {
_, rErr := asyncPrim.drmCfg.ExecuteInsertDML(
handlerCtx.GetSQLEngine(),
asyncPrim.insertCtx,
target,
"", // TODO: figure out how on earth to compute this encoding
)
if rErr != nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, rErr)
}
}
}
return prepareResultSet(&asyncPrim, pc, target, operationDescriptor)
}
return prepareResultSet(&asyncPrim, pc, body, operationDescriptor)
}
url, ok := body["selfLink"]
Expand All @@ -1511,7 +1600,6 @@ func GetMonitorExecutor(
fmt.Errorf("cannot execute monitor: no 'selfLink' property present"),
)
}
prStr := provider.GetName()
authCtx, authErr := pc.GetAuthContext(prStr)
if authErr != nil {
return internaldto.NewExecutorOutput(nil, nil, nil, nil, authErr)
Expand All @@ -1523,7 +1611,7 @@ func GetMonitorExecutor(
asyncPrim.elapsedSeconds += asyncPrim.pollIntervalSeconds
if !asyncPrim.noStatus {
//nolint:errcheck //TODO: handle error
pc.GetWriter().Write(
pc.GetErrWriter().Write(
[]byte(
fmt.Sprintf(
"%s in progress, %d seconds elapsed",
Expand Down Expand Up @@ -1649,6 +1737,8 @@ type asyncHTTPMonitorPrimitive struct {
noStatus bool
id int64
comments sqlparser.CommentDirectives
insertCtx drm.PreparedStatementCtx
drmCfg drm.Config
}

func (pr *asyncHTTPMonitorPrimitive) SetTxnID(_ int) {
Expand Down Expand Up @@ -1762,7 +1852,7 @@ func prepareResultSet(
}
if !prim.noStatus {
//nolint:errcheck //TODO: handle error
pc.GetWriter().Write([]byte(fmt.Sprintf("%s complete", operationDescriptor) + fmt.Sprintln("")))
pc.GetErrWriter().Write([]byte(fmt.Sprintf("%s complete", operationDescriptor) + fmt.Sprintln("")))
}
return util.PrepareResultSet(payload)
}
Expand Down
Loading
Loading