From 40e3c769a6812b8b6a443ef38acd78bdd10a9459 Mon Sep 17 00:00:00 2001 From: Johannes Malsam Date: Fri, 10 Apr 2026 13:53:47 +0200 Subject: [PATCH 1/5] change psql lock logic to take lock ID as a parameter --- central/dblock/dblock.go | 57 +++++++++++++++++++ .../dblock/dblock_test.go | 22 +++---- migrator/lock/lock.go | 43 +------------- 3 files changed, 71 insertions(+), 51 deletions(-) create mode 100644 central/dblock/dblock.go rename migrator/lock/lock_test.go => central/dblock/dblock_test.go (76%) diff --git a/central/dblock/dblock.go b/central/dblock/dblock.go new file mode 100644 index 0000000000000..c9a6238357630 --- /dev/null +++ b/central/dblock/dblock.go @@ -0,0 +1,57 @@ +package dblock + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/postgres" +) + +var ( + log = logging.LoggerForModule() +) + +// TryAcquireAdvisoryLock attempts to acquire a PostgreSQL advisory lock with the given ID without blocking. +// Returns whether the lock was acquired, a release function (nil if not acquired), and any error. +func TryAcquireAdvisoryLock(ctx context.Context, pool postgres.DB, lockID int64) (bool, func(), error) { + conn, err := pool.Acquire(ctx) + if err != nil { + return false, nil, errors.Wrap(err, "acquiring connection for advisory lock") + } + + var acquired bool + err = conn.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired) + if err != nil { + conn.Release() + return false, nil, errors.Wrap(err, "trying advisory lock") + } + + if !acquired { + conn.Release() + return false, nil, nil + } + + log.Infof("Advisory lock %d acquired.", lockID) + return true, makeRelease(conn, lockID), nil +} + +func makeRelease(conn *postgres.Conn, lockID int64) func() { + released := false + return func() { + if released { + return + } + released = true + unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer unlockCancel() + _, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID) + if err != nil { + log.Errorf("Failed to release advisory lock %d: %v", lockID, err) + } else { + log.Infof("Advisory lock %d released.", lockID) + } + conn.Release() + } +} diff --git a/migrator/lock/lock_test.go b/central/dblock/dblock_test.go similarity index 76% rename from migrator/lock/lock_test.go rename to central/dblock/dblock_test.go index ef19aaa531ac5..6c34a9cdf0bea 100644 --- a/migrator/lock/lock_test.go +++ b/central/dblock/dblock_test.go @@ -1,6 +1,6 @@ //go:build sql_integration -package lock +package dblock import ( "context" @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/suite" ) +const testLockID int64 = 999_999_999 + type AdvisoryLockSuite struct { suite.Suite pool postgres.DB @@ -45,14 +47,14 @@ func (s *AdvisoryLockSuite) TearDownTest() { } func (s *AdvisoryLockSuite) TestTryAcquireAndRelease() { - acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired) s.Require().NotNil(release) release() - acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired2) s.Require().NotNil(release2) @@ -60,13 +62,13 @@ func (s *AdvisoryLockSuite) TestTryAcquireAndRelease() { } func (s *AdvisoryLockSuite) TestMutualExclusion() { - acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired) s.Require().NotNil(release) - // TryAcquire should fail because lock already held by other connection. - acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool) + // TryAcquire should fail because lock already held by other connection. + acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().False(acquired2) s.Require().Nil(release2) @@ -75,12 +77,12 @@ func (s *AdvisoryLockSuite) TestMutualExclusion() { } func (s *AdvisoryLockSuite) TestReleaseAllowsReacquire() { - acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired) release() - acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired2) s.Require().NotNil(release2) @@ -99,7 +101,7 @@ func (s *AdvisoryLockSuite) TestConcurrentTryAcquire() { for i := 0; i < numGoroutines; i++ { go func(idx int) { defer wg.Done() - results[idx], releases[idx], errs[idx] = TryAcquireMigrationLock(s.ctx, s.pool) + results[idx], releases[idx], errs[idx] = TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) }(i) } wg.Wait() @@ -117,7 +119,7 @@ func (s *AdvisoryLockSuite) TestConcurrentTryAcquire() { } func (s *AdvisoryLockSuite) TestDoubleReleaseIsIdempotent() { - acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool) + acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID) s.Require().NoError(err) s.Require().True(acquired) diff --git a/migrator/lock/lock.go b/migrator/lock/lock.go index 2377a347576ab..110967541a03c 100644 --- a/migrator/lock/lock.go +++ b/migrator/lock/lock.go @@ -2,10 +2,8 @@ package lock import ( "context" - "time" - "github.com/pkg/errors" - "github.com/stackrox/rox/migrator/log" + "github.com/stackrox/rox/central/dblock" "github.com/stackrox/rox/pkg/postgres" ) @@ -18,42 +16,5 @@ const ( // TryAcquireMigrationLock attempts to acquire the migration advisory lock without blocking. // Returns whether the lock was acquired, a release function (nil if not acquired), and any error. func TryAcquireMigrationLock(ctx context.Context, pool postgres.DB) (bool, func(), error) { - conn, err := pool.Acquire(ctx) - if err != nil { - return false, nil, errors.Wrap(err, "acquiring connection for migration lock") - } - - var acquired bool - err = conn.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", migrationAdvisoryLockID).Scan(&acquired) - if err != nil { - conn.Release() - return false, nil, errors.Wrap(err, "trying migration advisory lock") - } - - if !acquired { - conn.Release() - return false, nil, nil - } - - log.WriteToStderr("Migration advisory lock acquired.") - return true, makeRelease(conn), nil -} - -func makeRelease(conn *postgres.Conn) func() { - released := false - return func() { - if released { - return - } - released = true - unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer unlockCancel() - _, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", migrationAdvisoryLockID) - if err != nil { - log.WriteToStderrf("Warning: failed to release migration advisory lock: %v", err) - } else { - log.WriteToStderr("Migration advisory lock released.") - } - conn.Release() - } + return dblock.TryAcquireAdvisoryLock(ctx, pool, migrationAdvisoryLockID) } From f2b19b02abc81d68ccc4c135873698f003cb30b8 Mon Sep 17 00:00:00 2001 From: Johannes Malsam Date: Fri, 10 Apr 2026 14:17:56 +0200 Subject: [PATCH 2/5] move it to pkg/dblock instead --- migrator/lock/lock.go | 2 +- {central => pkg}/dblock/dblock.go | 0 {central => pkg}/dblock/dblock_test.go | 0 3 files changed, 1 insertion(+), 1 deletion(-) rename {central => pkg}/dblock/dblock.go (100%) rename {central => pkg}/dblock/dblock_test.go (100%) diff --git a/migrator/lock/lock.go b/migrator/lock/lock.go index 110967541a03c..fc41a38845b05 100644 --- a/migrator/lock/lock.go +++ b/migrator/lock/lock.go @@ -3,7 +3,7 @@ package lock import ( "context" - "github.com/stackrox/rox/central/dblock" + "github.com/stackrox/rox/pkg/dblock" "github.com/stackrox/rox/pkg/postgres" ) diff --git a/central/dblock/dblock.go b/pkg/dblock/dblock.go similarity index 100% rename from central/dblock/dblock.go rename to pkg/dblock/dblock.go diff --git a/central/dblock/dblock_test.go b/pkg/dblock/dblock_test.go similarity index 100% rename from central/dblock/dblock_test.go rename to pkg/dblock/dblock_test.go From 9c9a412eb1a80917b0b78f42e79433123c8b0ff9 Mon Sep 17 00:00:00 2001 From: Johannes Malsam Date: Fri, 10 Apr 2026 14:21:23 +0200 Subject: [PATCH 3/5] add pkg/dblock to allowed imports --- tools/roxvet/analyzers/validateimports/analyzer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/roxvet/analyzers/validateimports/analyzer.go b/tools/roxvet/analyzers/validateimports/analyzer.go index 174d40b9edf38..1aa247fc064f1 100644 --- a/tools/roxvet/analyzers/validateimports/analyzer.go +++ b/tools/roxvet/analyzers/validateimports/analyzer.go @@ -272,6 +272,7 @@ func verifyImportsFromAllowedPackagesOnly(pass *analysis.Pass, imports []*ast.Im "pkg/cvss/cvssv2", "pkg/cvss/cvssv3", "pkg/db", + "pkg/dblock", "pkg/dberrors", "pkg/dbhelper", "pkg/defaults/policies", From 7451625b35fba09c423d972499ec40570be59b06 Mon Sep 17 00:00:00 2001 From: Johannes Malsam Date: Mon, 13 Apr 2026 10:22:57 +0200 Subject: [PATCH 4/5] change makeRelease to use sync.Once --- pkg/dblock/dblock.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/dblock/dblock.go b/pkg/dblock/dblock.go index c9a6238357630..7109aa0b9b4d1 100644 --- a/pkg/dblock/dblock.go +++ b/pkg/dblock/dblock.go @@ -2,6 +2,7 @@ package dblock import ( "context" + "sync" "time" "github.com/pkg/errors" @@ -38,20 +39,18 @@ func TryAcquireAdvisoryLock(ctx context.Context, pool postgres.DB, lockID int64) } func makeRelease(conn *postgres.Conn, lockID int64) func() { - released := false + once := sync.Once{} return func() { - if released { - return - } - released = true - unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer unlockCancel() - _, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID) - if err != nil { - log.Errorf("Failed to release advisory lock %d: %v", lockID, err) - } else { - log.Infof("Advisory lock %d released.", lockID) - } - conn.Release() + once.Do(func() { + unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer unlockCancel() + _, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID) + if err != nil { + log.Errorf("Failed to release advisory lock %d: %v", lockID, err) + } else { + log.Infof("Advisory lock %d released.", lockID) + } + conn.Release() + }) } } From f2e386fef405a11fdb9a05d109f632af4761c0a9 Mon Sep 17 00:00:00 2001 From: Johannes Malsam Date: Mon, 13 Apr 2026 14:40:10 +0200 Subject: [PATCH 5/5] use stackrox sync --- pkg/dblock/dblock.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/dblock/dblock.go b/pkg/dblock/dblock.go index 7109aa0b9b4d1..a4d8690dae708 100644 --- a/pkg/dblock/dblock.go +++ b/pkg/dblock/dblock.go @@ -2,12 +2,13 @@ package dblock import ( "context" - "sync" + "time" "github.com/pkg/errors" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/postgres" + "github.com/stackrox/rox/pkg/sync" ) var (