Skip to content

feat(fake): Add random Node IPs and Offline/Online to fake workloads#19549

Draft
vikin91 wants to merge 2 commits intomasterfrom
piotr/node-ports-reproduction
Draft

feat(fake): Add random Node IPs and Offline/Online to fake workloads#19549
vikin91 wants to merge 2 commits intomasterfrom
piotr/node-ports-reproduction

Conversation

@vikin91
Copy link
Contributor

@vikin91 vikin91 commented Mar 23, 2026

Description

TODO - Draft code as a PoC - need to review and refactor.

Reproduction of ROX-33474

Run fake workloads with multiple nodes having node ports:

KUBECONFIG="$HOME/.cluster1/kubeconfig"  go run ./tools/local-sensor \
    -with-fakeworkload scale/workloads/high-node-count.yaml \
    -with-metrics \
    -with-pprof-server \
    -duration 15m \
    -skip-central-output

Result (after ~2 minutes):

Action RWMutex.Unlock took more than 10s to complete. Stack trace:
github.com/stackrox/rox/pkg/sync.(*RWMutex).Unlock (/$HOME/src/go/src/github.com/stackrox/stackrox/pkg/sync/mutex_dev.go:129)

Mitigation: Disable history

KUBECONFIG="$HOME/.cluster1/kubeconfig" ROX_PAST_CLUSTER_ENTITIES_MEMORY_SIZE=0 go run ./tools/local-sensor \
    -with-fakeworkload scale/workloads/high-node-count.yaml \
    -with-metrics \
    -with-pprof-server \
    -duration 15m \
    -skip-central-output

Result: workload runs for the full 15 minutes. Expected to run forever if the -duration is omitted.

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

change me!

@vikin91
Copy link
Contributor Author

vikin91 commented Mar 23, 2026

This change is part of the following stack:

Change managed by git-spice.

@openshift-ci
Copy link

openshift-ci bot commented Mar 23, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • The global nextNodePort counter in sensor/kubernetes/fake/service.go is not concurrency-safe and will produce non-deterministic behavior if fake services are created from multiple goroutines; consider scoping it to a WorkloadManager instance or protecting it with a mutex.
  • In validateWorkload, you both clamp CentralConnectionCrashCycle to 0 and return an error; if NewWorkloadManager treats any error as fatal this will prevent using an otherwise valid workload—either avoid returning an error for corrected values or handle this case as a non-fatal warning at the call site.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The global `nextNodePort` counter in `sensor/kubernetes/fake/service.go` is not concurrency-safe and will produce non-deterministic behavior if fake services are created from multiple goroutines; consider scoping it to a `WorkloadManager` instance or protecting it with a mutex.
- In `validateWorkload`, you both clamp `CentralConnectionCrashCycle` to 0 and return an error; if `NewWorkloadManager` treats any error as fatal this will prevent using an otherwise valid workload—either avoid returning an error for corrected values or handle this case as a non-fatal warning at the call site.

## Individual Comments

### Comment 1
<location path="sensor/kubernetes/fake/fake.go" line_range="318-320" />
<code_context>
 }

 func validateWorkload(workload *Workload) error {
+	if workload.CentralConnectionCrashCycle < 0 {
+		workload.CentralConnectionCrashCycle = 0
+		return errors.New("negative centralConnectionCrashCycle, clamped to 0")
+	}
 	if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 {
</code_context>
<issue_to_address>
**issue (bug_risk):** Clamping `CentralConnectionCrashCycle` to 0 while still returning an error may be inconsistent with the validation pattern.

Elsewhere in this function (e.g. `OpenPortReuseProbability`), invalid values are only clamped and do not prevent workload loading. Here, a negative `centralConnectionCrashCycle` both clamps and fails. Please either (a) treat this like the other fields (clamp + log/record the issue, no error) or (b) remove the clamping and fail fast, so callers aren’t misled into thinking the original value was accepted.
</issue_to_address>

### Comment 2
<location path="sensor/kubernetes/fake/service.go" line_range="15-18" />
<code_context>
 var (
 	protocols  = [...]string{"TCP", "UDP", "SCTP"}
 	ipFamilies = [...]string{"IPv4", "IPv6"}
+	nextNodePort int32 = 30000
 )

</code_context>
<issue_to_address>
**issue (bug_risk):** Global `nextNodePort` counter is not concurrency-safe and could generate duplicate ports under concurrent use.

`getUniqueNodePort` increments the package-level `nextNodePort` without any synchronization, so concurrent calls could race and produce duplicate ports. If concurrent use is expected, please guard `nextNodePort` with a mutex or `atomic.AddInt32`. If it’s guaranteed to be single-threaded, document that assumption near this helper to prevent future misuse.
</issue_to_address>

### Comment 3
<location path="tools/local-sensor/main.go" line_range="482" />
<code_context>
 }

-func setupCentralWithFakeConnection(localConfig localSensorConfig) (centralclient.CentralConnectionFactory, centralclient.CertLoader, *centralDebug.FakeService) {
+func createFakeCentralService(localConfig localSensorConfig) *centralDebug.FakeService {
 	utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
 	utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the fake-central setup into a single helper and wrapping the crash-cycle behavior in a small type to make main’s wiring clearer and reduce parameter sprawl.

One concrete way to reduce the complexity without changing behavior is to (1) collapse the fake‑central wiring into a single cohesive helper, and (2) encapsulate the crash‑cycle logic into a small type instead of a free function with many parameters.

### 1. Collapse fake‑central wiring into one helper

Right now the flow is split across `createFakeCentralService`, `startFakeCentralServer`, and `setupCentralWithFakeConnection`, and you also return both `centralclient.CentralConnectionFactory` and `centralDebug.FakeGRPCFactory` (which are currently the same object).

You can keep all behavior but simplify the call graph and parameter threading by collapsing the setup into a single helper that returns the fake factory (which also implements `CentralConnectionFactory`) plus the spy:

```go
// keeps env setup + initial messages + server start in one place
func setupCentralWithFakeConnection(localConfig localSensorConfig) (
    centralDebug.FakeGRPCFactory, // also satisfies CentralConnectionFactory
    centralclient.CertLoader,
    *centralDebug.FakeService,
) {
    utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_CA_FILE", "tools/local-sensor/certs/caCert.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_CA_KEY_FILE", "tools/local-sensor/certs/caKey.pem"))

    // existing policy loading + initialMessages setup...
    fakeCentral := centralDebug.MakeFakeCentralWithInitialMessages(initialMessages...)
    if localConfig.Verbose {
        fakeCentral.OnMessage(func(msg *central.MsgFromSensor) {
            log.Printf("MESSAGE RECEIVED: %s\n", msg.String())
        })
    }
    fakeCentral.SetMessageRecording(!localConfig.SkipCentralOutput)

    conn, spyCentral, shutdownFakeServer := createConnectionAndStartServer(fakeCentral)
    fakeCentral.OnShutdown(shutdownFakeServer)

    fakeFactory := centralDebug.MakeFakeConnectionFactory(conn)

    return fakeFactory, centralclient.EmptyCertLoader(), spyCentral
}
```

Then in `main` you only need one factory variable, and it’s clear that fake and real paths both provide the same abstraction:

```go
var connection centralclient.CentralConnectionFactory
var fakeFactory centralDebug.FakeGRPCFactory
var certLoader centralclient.CertLoader
var spyCentral *centralDebug.FakeService

if isFakeCentral {
    fakeFactory, certLoader, spyCentral = setupCentralWithFakeConnection(localConfig)
    connection = fakeFactory
    defer spyCentral.Stop()
} else {
    connection, certLoader = setupCentralWithRealConnection(k8sClient, localConfig)
}
```

This removes `createFakeCentralService` and `startFakeCentralServer` and the “double factory” signature.

You can keep a small internal helper for server restarts used by the crash cycle if you like, but it’s clearer if it’s narrowly scoped:

```go
func restartFakeCentralServer(localConfig localSensorConfig) (*centralDebug.FakeService, *grpc.ClientConn) {
    // minimal subset: construct fakeCentral + start server
    // re-use pieces of setupCentralWithFakeConnection as needed
}
```

### 2. Encapsulate crash‑cycle logic in a small type

`startCentralCrashCycle` currently takes many parameters and has fairly dense goroutine logic. You can keep the behavior but move the lifecycle details into a small type so `main` only orchestrates:

```go
type centralCrashCycler struct {
    ctx         context.Context
    localConfig localSensorConfig
    factory     centralDebug.FakeGRPCFactory
    initial     *centralDebug.FakeService
}

func newCentralCrashCycler(
    ctx context.Context,
    localConfig localSensorConfig,
    factory centralDebug.FakeGRPCFactory,
    initial *centralDebug.FakeService,
) *centralCrashCycler {
    return &centralCrashCycler{
        ctx:         ctx,
        localConfig: localConfig,
        factory:     factory,
        initial:     initial,
    }
}

func (c *centralCrashCycler) Start(cycle time.Duration, sensor *commonSensor.Sensor) {
    if cycle <= 0 || sensor == nil {
        return
    }

    log.Printf("Enabled Central connection crash cycle every %s", cycle)

    go func() {
        ticker := time.NewTicker(cycle)
        defer ticker.Stop()
        currentCentral := c.initial

        for {
            select {
            case <-c.ctx.Done():
                if currentCentral != nil {
                    currentCentral.Stop()
                }
                return
            case <-ticker.C:
                log.Printf("Triggering Central connection crash cycle")
                if c.factory != nil && currentCentral != nil {
                    replacementCentral, replacementConn := restartFakeCentralServer(c.localConfig)
                    c.factory.OverwriteCentralConnection(replacementConn)

                    prev := currentCentral
                    currentCentral = replacementCentral
                    if !prev.KillSwitch.IsDone() {
                        prev.KillSwitch.Signal()
                    }
                    prev.Stop()
                    continue
                }
                sensor.RequestCentralConnectionRestart("local-sensor centralConnectionCrashCycle")
            }
        }
    }()
}
```

`main` then becomes simpler and less coupled:

```go
if workloadManager != nil {
    cycler := newCentralCrashCycler(ctx, localConfig, fakeFactory, spyCentral)
    cycler.Start(workloadManager.CentralConnectionCrashCycle(), s)
}
```

This keeps all the crash‑cycle behavior (including the fake‑central rotation semantics) but:

- Removes the wide parameter list from `startCentralCrashCycle`.
- Keeps `main` focused on wiring and configuration, not ticker/kill‑switch orchestration.
- Aligns the fake‑central abstractions so you don’t have to juggle two parallel “factory” parameters through `main`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +318 to +320
if workload.CentralConnectionCrashCycle < 0 {
workload.CentralConnectionCrashCycle = 0
return errors.New("negative centralConnectionCrashCycle, clamped to 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Clamping CentralConnectionCrashCycle to 0 while still returning an error may be inconsistent with the validation pattern.

Elsewhere in this function (e.g. OpenPortReuseProbability), invalid values are only clamped and do not prevent workload loading. Here, a negative centralConnectionCrashCycle both clamps and fails. Please either (a) treat this like the other fields (clamp + log/record the issue, no error) or (b) remove the clamping and fail fast, so callers aren’t misled into thinking the original value was accepted.

Comment on lines +15 to 18
nextNodePort int32 = 30000
)

func getRandProtocol() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Global nextNodePort counter is not concurrency-safe and could generate duplicate ports under concurrent use.

getUniqueNodePort increments the package-level nextNodePort without any synchronization, so concurrent calls could race and produce duplicate ports. If concurrent use is expected, please guard nextNodePort with a mutex or atomic.AddInt32. If it’s guaranteed to be single-threaded, document that assumption near this helper to prevent future misuse.

}

func setupCentralWithFakeConnection(localConfig localSensorConfig) (centralclient.CentralConnectionFactory, centralclient.CertLoader, *centralDebug.FakeService) {
func createFakeCentralService(localConfig localSensorConfig) *centralDebug.FakeService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the fake-central setup into a single helper and wrapping the crash-cycle behavior in a small type to make main’s wiring clearer and reduce parameter sprawl.

One concrete way to reduce the complexity without changing behavior is to (1) collapse the fake‑central wiring into a single cohesive helper, and (2) encapsulate the crash‑cycle logic into a small type instead of a free function with many parameters.

1. Collapse fake‑central wiring into one helper

Right now the flow is split across createFakeCentralService, startFakeCentralServer, and setupCentralWithFakeConnection, and you also return both centralclient.CentralConnectionFactory and centralDebug.FakeGRPCFactory (which are currently the same object).

You can keep all behavior but simplify the call graph and parameter threading by collapsing the setup into a single helper that returns the fake factory (which also implements CentralConnectionFactory) plus the spy:

// keeps env setup + initial messages + server start in one place
func setupCentralWithFakeConnection(localConfig localSensorConfig) (
    centralDebug.FakeGRPCFactory, // also satisfies CentralConnectionFactory
    centralclient.CertLoader,
    *centralDebug.FakeService,
) {
    utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_CA_FILE", "tools/local-sensor/certs/caCert.pem"))
    utils.CrashOnError(os.Setenv("ROX_MTLS_CA_KEY_FILE", "tools/local-sensor/certs/caKey.pem"))

    // existing policy loading + initialMessages setup...
    fakeCentral := centralDebug.MakeFakeCentralWithInitialMessages(initialMessages...)
    if localConfig.Verbose {
        fakeCentral.OnMessage(func(msg *central.MsgFromSensor) {
            log.Printf("MESSAGE RECEIVED: %s\n", msg.String())
        })
    }
    fakeCentral.SetMessageRecording(!localConfig.SkipCentralOutput)

    conn, spyCentral, shutdownFakeServer := createConnectionAndStartServer(fakeCentral)
    fakeCentral.OnShutdown(shutdownFakeServer)

    fakeFactory := centralDebug.MakeFakeConnectionFactory(conn)

    return fakeFactory, centralclient.EmptyCertLoader(), spyCentral
}

Then in main you only need one factory variable, and it’s clear that fake and real paths both provide the same abstraction:

var connection centralclient.CentralConnectionFactory
var fakeFactory centralDebug.FakeGRPCFactory
var certLoader centralclient.CertLoader
var spyCentral *centralDebug.FakeService

if isFakeCentral {
    fakeFactory, certLoader, spyCentral = setupCentralWithFakeConnection(localConfig)
    connection = fakeFactory
    defer spyCentral.Stop()
} else {
    connection, certLoader = setupCentralWithRealConnection(k8sClient, localConfig)
}

This removes createFakeCentralService and startFakeCentralServer and the “double factory” signature.

You can keep a small internal helper for server restarts used by the crash cycle if you like, but it’s clearer if it’s narrowly scoped:

func restartFakeCentralServer(localConfig localSensorConfig) (*centralDebug.FakeService, *grpc.ClientConn) {
    // minimal subset: construct fakeCentral + start server
    // re-use pieces of setupCentralWithFakeConnection as needed
}

2. Encapsulate crash‑cycle logic in a small type

startCentralCrashCycle currently takes many parameters and has fairly dense goroutine logic. You can keep the behavior but move the lifecycle details into a small type so main only orchestrates:

type centralCrashCycler struct {
    ctx         context.Context
    localConfig localSensorConfig
    factory     centralDebug.FakeGRPCFactory
    initial     *centralDebug.FakeService
}

func newCentralCrashCycler(
    ctx context.Context,
    localConfig localSensorConfig,
    factory centralDebug.FakeGRPCFactory,
    initial *centralDebug.FakeService,
) *centralCrashCycler {
    return &centralCrashCycler{
        ctx:         ctx,
        localConfig: localConfig,
        factory:     factory,
        initial:     initial,
    }
}

func (c *centralCrashCycler) Start(cycle time.Duration, sensor *commonSensor.Sensor) {
    if cycle <= 0 || sensor == nil {
        return
    }

    log.Printf("Enabled Central connection crash cycle every %s", cycle)

    go func() {
        ticker := time.NewTicker(cycle)
        defer ticker.Stop()
        currentCentral := c.initial

        for {
            select {
            case <-c.ctx.Done():
                if currentCentral != nil {
                    currentCentral.Stop()
                }
                return
            case <-ticker.C:
                log.Printf("Triggering Central connection crash cycle")
                if c.factory != nil && currentCentral != nil {
                    replacementCentral, replacementConn := restartFakeCentralServer(c.localConfig)
                    c.factory.OverwriteCentralConnection(replacementConn)

                    prev := currentCentral
                    currentCentral = replacementCentral
                    if !prev.KillSwitch.IsDone() {
                        prev.KillSwitch.Signal()
                    }
                    prev.Stop()
                    continue
                }
                sensor.RequestCentralConnectionRestart("local-sensor centralConnectionCrashCycle")
            }
        }
    }()
}

main then becomes simpler and less coupled:

if workloadManager != nil {
    cycler := newCentralCrashCycler(ctx, localConfig, fakeFactory, spyCentral)
    cycler.Start(workloadManager.CentralConnectionCrashCycle(), s)
}

This keeps all the crash‑cycle behavior (including the fake‑central rotation semantics) but:

  • Removes the wide parameter list from startCentralCrashCycle.
  • Keeps main focused on wiring and configuration, not ticker/kill‑switch orchestration.
  • Aligns the fake‑central abstractions so you don’t have to juggle two parallel “factory” parameters through main.

@rhacs-bot
Copy link
Contributor

Images are ready for the commit at 14f86c5.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.11.x-413-g14f86c59ba.

@codecov
Copy link

codecov bot commented Mar 23, 2026

Codecov Report

❌ Patch coverage is 0% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.25%. Comparing base (b1cc760) to head (14f86c5).
⚠️ Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
sensor/common/sensor/sensor.go 0.00% 11 Missing ⚠️
sensor/kubernetes/fake/fake.go 0.00% 9 Missing ⚠️
sensor/kubernetes/fake/service.go 0.00% 9 Missing ⚠️
sensor/kubernetes/fake/nodes.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #19549      +/-   ##
==========================================
- Coverage   49.26%   49.25%   -0.01%     
==========================================
  Files        2727     2727              
  Lines      205788   205807      +19     
==========================================
- Hits       101383   101380       -3     
- Misses      96874    96894      +20     
- Partials     7531     7533       +2     
Flag Coverage Δ
go-unit-tests 49.25% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants