From 37461f4181a15d054881e5dc17c08242c28dd5aa Mon Sep 17 00:00:00 2001 From: Mladen Todorovic Date: Mon, 17 Mar 2025 11:26:53 +0100 Subject: [PATCH] Change run query for schema to use cursors only when required --- pkg/search/postgres/common.go | 129 ++++++++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 28 deletions(-) diff --git a/pkg/search/postgres/common.go b/pkg/search/postgres/common.go index 20678713f2469..3114cfe405466 100644 --- a/pkg/search/postgres/common.go +++ b/pkg/search/postgres/common.go @@ -52,6 +52,13 @@ var ( const cursorBatchSize = 1000 +type cursorSession struct { + id string + tx *postgres.Tx + + close func() +} + // QueryType describe what type of query to execute // //go:generate stringer -type=QueryType @@ -975,6 +982,8 @@ func retryableRunGetManyQueryForSchema[T any, PT pgutils.Unmarshaler[T]](ctx con } // RunGetManyQueryForSchema executes a request for just the search against the database and unmarshal it to given type. +// +// Deprecated: use RunQueryForSchemaFn instead func RunGetManyQueryForSchema[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) ([]*T, error) { if q == nil { q = searchPkg.EmptyQuery() @@ -994,63 +1003,127 @@ func RunGetManyQueryForSchema[T any, PT pgutils.Unmarshaler[T]](ctx context.Cont }) } -// RunCursorQueryForSchemaFn creates a cursor against the database -func RunCursorQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error { +func prepareQuery(ctx context.Context, schema *walker.Schema, q *v1.Query) (*query, error) { if q == nil { q = searchPkg.EmptyQuery() } - query, err := standardizeQueryAndPopulatePath(ctx, q, schema, GET) + preparedQuery, err := standardizeQueryAndPopulatePath(ctx, q, schema, GET) if err != nil { - return errors.Wrap(err, "error creating query") + return nil, errors.Wrap(err, "error creating query") } - if query == nil { - return emptyQueryErr + if preparedQuery == nil { + return nil, emptyQueryErr } - queryStr := query.AsSQL() + return preparedQuery, nil +} - ctx, cancel := contextutil.ContextWithTimeoutIfNotExists(ctx, cursorDefaultTimeout) - defer cancel() +func handleRowsWithCallback[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, rows pgx.Rows, callback func(obj PT) error) (int64, error) { + var data []byte + tag, err := pgx.ForEachRow(rows, []any{&data}, func() error { + if ctx.Err() != nil { + return errors.Wrap(ctx.Err(), "iterating over rows") + } + + msg := new(T) + if errUnmarshal := PT(msg).UnmarshalVTUnsafe(data); errUnmarshal != nil { + return errUnmarshal + } + return callback(msg) + }) + + return tag.RowsAffected(), err +} + +func retryableGetRows(ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) (*tracedRows, error) { + preparedQuery, err := prepareQuery(ctx, schema, q) + if err != nil { + return nil, err + } + + queryStr := preparedQuery.AsSQL() + return tracedQuery(ctx, db, queryStr, preparedQuery.Data...) +} + +func RunQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error { + rows, err := pgutils.Retry2(ctx, func() (*tracedRows, error) { + return retryableGetRows(ctx, schema, q, db) + }) + if err != nil { + return err + } + + _, err = handleRowsWithCallback(ctx, rows, callback) + if err != nil { + return errors.Wrap(err, "processing rows") + } + + return nil +} + +func retryableGetCursorSession(ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) (*cursorSession, error) { + preparedQuery, err := prepareQuery(ctx, schema, q) + if err != nil { + return nil, err + } + + queryStr := preparedQuery.AsSQL() tx, err := db.Begin(ctx) if err != nil { - return errors.Wrap(err, "creating transaction") + return nil, errors.Wrap(err, "creating transaction") } - defer func() { + + // We have to ensure that cleanup function is called if exit early. + cleanupFunc := func() { if err := tx.Commit(ctx); err != nil { log.Errorf("error committing cursor transaction: %v", err) } - }() + } cursorSuffix := random.GenerateString(16, random.CaseInsensitiveAlpha) - cursor := stringutils.JoinNonEmpty("_", query.From, cursorSuffix) - _, err = tx.Exec(ctx, fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursor, queryStr), query.Data...) + cursorId := stringutils.JoinNonEmpty("_", preparedQuery.From, cursorSuffix) + + _, err = tx.Exec(ctx, fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorId, queryStr), preparedQuery.Data...) + if err != nil { + cleanupFunc() + return nil, errors.Wrap(err, "creating cursor") + } + + cursor := cursorSession{ + id: cursorId, + tx: tx, + close: cleanupFunc, + } + + return &cursor, nil +} + +func RunCursorQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error { + ctx, cancel := contextutil.ContextWithTimeoutIfNotExists(ctx, cursorDefaultTimeout) + defer cancel() + + cursor, err := pgutils.Retry2(ctx, func() (*cursorSession, error) { + return retryableGetCursorSession(ctx, schema, q, db) + }) if err != nil { - return errors.Wrap(err, "creating cursor") + return errors.Wrap(err, "prepare cursor") } + defer cursor.close() for { - rows, err := tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", cursorBatchSize, cursor)) + rows, err := cursor.tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", cursorBatchSize, cursor.id)) if err != nil { return errors.Wrap(err, "advancing in cursor") } - var data []byte - tag, err := pgx.ForEachRow(rows, []any{&data}, func() error { - if ctx.Err() != nil { - return errors.Wrap(ctx.Err(), "iterating over rows") - } - msg := new(T) - if err := PT(msg).UnmarshalVTUnsafe(data); err != nil { - return err - } - return callback(msg) - }) + rowsAffected, err := handleRowsWithCallback(ctx, rows, callback) if err != nil { return errors.Wrap(err, "processing rows") } - if tag.RowsAffected() != cursorBatchSize { + + if rowsAffected != cursorBatchSize { return nil } }