Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 101 additions & 28 deletions pkg/search/postgres/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@

const cursorBatchSize = 1000

type cursorSession struct {
id string
tx *postgres.Tx

close func()
}

// QueryType describe what type of query to execute
//
//go:generate stringer -type=QueryType
Expand Down Expand Up @@ -975,6 +982,8 @@
}

// RunGetManyQueryForSchema executes a request for just the search against the database and unmarshal it to given type.
//
// Deprecated: use RunQueryForSchemaFn instead
func RunGetManyQueryForSchema[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) ([]*T, error) {
if q == nil {
q = searchPkg.EmptyQuery()
Expand All @@ -994,63 +1003,127 @@
})
}

// RunCursorQueryForSchemaFn creates a cursor against the database
func RunCursorQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error {
func prepareQuery(ctx context.Context, schema *walker.Schema, q *v1.Query) (*query, error) {
if q == nil {
q = searchPkg.EmptyQuery()
}

query, err := standardizeQueryAndPopulatePath(ctx, q, schema, GET)
preparedQuery, err := standardizeQueryAndPopulatePath(ctx, q, schema, GET)
if err != nil {
return errors.Wrap(err, "error creating query")
return nil, errors.Wrap(err, "error creating query")

Check warning on line 1013 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1013

Added line #L1013 was not covered by tests
}
if query == nil {
return emptyQueryErr
if preparedQuery == nil {
return nil, emptyQueryErr

Check warning on line 1016 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1016

Added line #L1016 was not covered by tests
}

queryStr := query.AsSQL()
return preparedQuery, nil
}

ctx, cancel := contextutil.ContextWithTimeoutIfNotExists(ctx, cursorDefaultTimeout)
defer cancel()
func handleRowsWithCallback[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, rows pgx.Rows, callback func(obj PT) error) (int64, error) {
var data []byte
tag, err := pgx.ForEachRow(rows, []any{&data}, func() error {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "iterating over rows")
}

msg := new(T)
if errUnmarshal := PT(msg).UnmarshalVTUnsafe(data); errUnmarshal != nil {
return errUnmarshal
}

Check warning on line 1032 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1031-L1032

Added lines #L1031 - L1032 were not covered by tests
return callback(msg)
})

return tag.RowsAffected(), err
}

func retryableGetRows(ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) (*tracedRows, error) {
preparedQuery, err := prepareQuery(ctx, schema, q)
if err != nil {
return nil, err
}

Check warning on line 1043 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1039-L1043

Added lines #L1039 - L1043 were not covered by tests

queryStr := preparedQuery.AsSQL()
return tracedQuery(ctx, db, queryStr, preparedQuery.Data...)

Check warning on line 1046 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1045-L1046

Added lines #L1045 - L1046 were not covered by tests
}

func RunQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error {
rows, err := pgutils.Retry2(ctx, func() (*tracedRows, error) {
return retryableGetRows(ctx, schema, q, db)
})
if err != nil {
return err
}

Check warning on line 1055 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1049-L1055

Added lines #L1049 - L1055 were not covered by tests

_, err = handleRowsWithCallback(ctx, rows, callback)
if err != nil {
return errors.Wrap(err, "processing rows")
}

Check warning on line 1060 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1057-L1060

Added lines #L1057 - L1060 were not covered by tests

return nil

Check warning on line 1062 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1062

Added line #L1062 was not covered by tests
}

func retryableGetCursorSession(ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB) (*cursorSession, error) {
preparedQuery, err := prepareQuery(ctx, schema, q)
if err != nil {
return nil, err
}

Check warning on line 1069 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1068-L1069

Added lines #L1068 - L1069 were not covered by tests

queryStr := preparedQuery.AsSQL()

tx, err := db.Begin(ctx)
if err != nil {
return errors.Wrap(err, "creating transaction")
return nil, errors.Wrap(err, "creating transaction")

Check warning on line 1075 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1075

Added line #L1075 was not covered by tests
}
defer func() {

// We have to ensure that cleanup function is called if exit early.
cleanupFunc := func() {
if err := tx.Commit(ctx); err != nil {
log.Errorf("error committing cursor transaction: %v", err)
}
}()
}

cursorSuffix := random.GenerateString(16, random.CaseInsensitiveAlpha)
cursor := stringutils.JoinNonEmpty("_", query.From, cursorSuffix)
_, err = tx.Exec(ctx, fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursor, queryStr), query.Data...)
cursorId := stringutils.JoinNonEmpty("_", preparedQuery.From, cursorSuffix)

_, err = tx.Exec(ctx, fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorId, queryStr), preparedQuery.Data...)
if err != nil {
cleanupFunc()
return nil, errors.Wrap(err, "creating cursor")
}

Check warning on line 1092 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1090-L1092

Added lines #L1090 - L1092 were not covered by tests

cursor := cursorSession{
id: cursorId,
tx: tx,
close: cleanupFunc,
}

return &cursor, nil
}

func RunCursorQueryForSchemaFn[T any, PT pgutils.Unmarshaler[T]](ctx context.Context, schema *walker.Schema, q *v1.Query, db postgres.DB, callback func(obj PT) error) error {
ctx, cancel := contextutil.ContextWithTimeoutIfNotExists(ctx, cursorDefaultTimeout)
defer cancel()

cursor, err := pgutils.Retry2(ctx, func() (*cursorSession, error) {
return retryableGetCursorSession(ctx, schema, q, db)
})
if err != nil {
return errors.Wrap(err, "creating cursor")
return errors.Wrap(err, "prepare cursor")

Check warning on line 1111 in pkg/search/postgres/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/search/postgres/common.go#L1111

Added line #L1111 was not covered by tests
}
defer cursor.close()

for {
rows, err := tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", cursorBatchSize, cursor))
rows, err := cursor.tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", cursorBatchSize, cursor.id))
if err != nil {
return errors.Wrap(err, "advancing in cursor")
}

var data []byte
tag, err := pgx.ForEachRow(rows, []any{&data}, func() error {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "iterating over rows")
}
msg := new(T)
if err := PT(msg).UnmarshalVTUnsafe(data); err != nil {
return err
}
return callback(msg)
})
rowsAffected, err := handleRowsWithCallback(ctx, rows, callback)
if err != nil {
return errors.Wrap(err, "processing rows")
}
if tag.RowsAffected() != cursorBatchSize {

if rowsAffected != cursorBatchSize {
return nil
}
}
Expand Down
Loading