Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 2 additions & 41 deletions migrator/lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package lock

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/stackrox/rox/migrator/log"
"github.com/stackrox/rox/pkg/dblock"
"github.com/stackrox/rox/pkg/postgres"
)

Expand All @@ -18,42 +16,5 @@ const (
// TryAcquireMigrationLock attempts to acquire the migration advisory lock without blocking.
// Returns whether the lock was acquired, a release function (nil if not acquired), and any error.
func TryAcquireMigrationLock(ctx context.Context, pool postgres.DB) (bool, func(), error) {
conn, err := pool.Acquire(ctx)
if err != nil {
return false, nil, errors.Wrap(err, "acquiring connection for migration lock")
}

var acquired bool
err = conn.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", migrationAdvisoryLockID).Scan(&acquired)
if err != nil {
conn.Release()
return false, nil, errors.Wrap(err, "trying migration advisory lock")
}

if !acquired {
conn.Release()
return false, nil, nil
}

log.WriteToStderr("Migration advisory lock acquired.")
return true, makeRelease(conn), nil
}

func makeRelease(conn *postgres.Conn) func() {
released := false
return func() {
if released {
return
}
released = true
unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer unlockCancel()
_, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", migrationAdvisoryLockID)
if err != nil {
log.WriteToStderrf("Warning: failed to release migration advisory lock: %v", err)
} else {
log.WriteToStderr("Migration advisory lock released.")
}
conn.Release()
}
return dblock.TryAcquireAdvisoryLock(ctx, pool, migrationAdvisoryLockID)
}
57 changes: 57 additions & 0 deletions pkg/dblock/dblock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dblock

import (
"context"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the blank line between context and time.

"time"

"github.com/pkg/errors"
"github.com/stackrox/rox/pkg/logging"
"github.com/stackrox/rox/pkg/postgres"
"github.com/stackrox/rox/pkg/sync"
)

var (
log = logging.LoggerForModule()
)

// TryAcquireAdvisoryLock attempts to acquire a PostgreSQL advisory lock with the given ID without blocking.
// Returns whether the lock was acquired, a release function (nil if not acquired), and any error.
func TryAcquireAdvisoryLock(ctx context.Context, pool postgres.DB, lockID int64) (bool, func(), error) {
conn, err := pool.Acquire(ctx)
if err != nil {
return false, nil, errors.Wrap(err, "acquiring connection for advisory lock")
}

var acquired bool
err = conn.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
conn.Release()
return false, nil, errors.Wrap(err, "trying advisory lock")
}

if !acquired {
conn.Release()
return false, nil, nil
}

log.Infof("Advisory lock %d acquired.", lockID)
return true, makeRelease(conn, lockID), nil
}

func makeRelease(conn *postgres.Conn, lockID int64) func() {
once := sync.Once{}
return func() {
once.Do(func() {
unlockCtx, unlockCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer unlockCancel()
_, err := conn.Exec(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID)
if err != nil {
log.Errorf("Failed to release advisory lock %d: %v", lockID, err)
} else {
log.Infof("Advisory lock %d released.", lockID)
}
conn.Release()
})
}
}
22 changes: 12 additions & 10 deletions migrator/lock/lock_test.go → pkg/dblock/dblock_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build sql_integration

package lock
package dblock

import (
"context"
Expand All @@ -14,6 +14,8 @@ import (
"github.com/stretchr/testify/suite"
)

const testLockID int64 = 999_999_999

type AdvisoryLockSuite struct {
suite.Suite
pool postgres.DB
Expand Down Expand Up @@ -45,28 +47,28 @@ func (s *AdvisoryLockSuite) TearDownTest() {
}

func (s *AdvisoryLockSuite) TestTryAcquireAndRelease() {
acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired)
s.Require().NotNil(release)

release()

acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired2)
s.Require().NotNil(release2)
release2()
}

func (s *AdvisoryLockSuite) TestMutualExclusion() {
acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired)
s.Require().NotNil(release)

// TryAcquire should fail because lock already held by other connection.
acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool)
// TryAcquire should fail because lock already held by other connection.
acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().False(acquired2)
s.Require().Nil(release2)
Expand All @@ -75,12 +77,12 @@ func (s *AdvisoryLockSuite) TestMutualExclusion() {
}

func (s *AdvisoryLockSuite) TestReleaseAllowsReacquire() {
acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired)
release()

acquired2, release2, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired2, release2, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired2)
s.Require().NotNil(release2)
Expand All @@ -99,7 +101,7 @@ func (s *AdvisoryLockSuite) TestConcurrentTryAcquire() {
for i := 0; i < numGoroutines; i++ {
go func(idx int) {
defer wg.Done()
results[idx], releases[idx], errs[idx] = TryAcquireMigrationLock(s.ctx, s.pool)
results[idx], releases[idx], errs[idx] = TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
}(i)
}
wg.Wait()
Expand All @@ -117,7 +119,7 @@ func (s *AdvisoryLockSuite) TestConcurrentTryAcquire() {
}

func (s *AdvisoryLockSuite) TestDoubleReleaseIsIdempotent() {
acquired, release, err := TryAcquireMigrationLock(s.ctx, s.pool)
acquired, release, err := TryAcquireAdvisoryLock(s.ctx, s.pool, testLockID)
s.Require().NoError(err)
s.Require().True(acquired)

Expand Down
1 change: 1 addition & 0 deletions tools/roxvet/analyzers/validateimports/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func verifyImportsFromAllowedPackagesOnly(pass *analysis.Pass, imports []*ast.Im
"pkg/cvss/cvssv2",
"pkg/cvss/cvssv3",
"pkg/db",
"pkg/dblock",
"pkg/dberrors",
"pkg/dbhelper",
"pkg/defaults/policies",
Expand Down
Loading