feat(fake): Add random Node IPs and Offline/Online to fake workloads#19549
feat(fake): Add random Node IPs and Offline/Online to fake workloads#19549
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The global
nextNodePortcounter insensor/kubernetes/fake/service.gois not concurrency-safe and will produce non-deterministic behavior if fake services are created from multiple goroutines; consider scoping it to aWorkloadManagerinstance or protecting it with a mutex. - In
validateWorkload, you both clampCentralConnectionCrashCycleto 0 and return an error; ifNewWorkloadManagertreats any error as fatal this will prevent using an otherwise valid workload—either avoid returning an error for corrected values or handle this case as a non-fatal warning at the call site.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The global `nextNodePort` counter in `sensor/kubernetes/fake/service.go` is not concurrency-safe and will produce non-deterministic behavior if fake services are created from multiple goroutines; consider scoping it to a `WorkloadManager` instance or protecting it with a mutex.
- In `validateWorkload`, you both clamp `CentralConnectionCrashCycle` to 0 and return an error; if `NewWorkloadManager` treats any error as fatal this will prevent using an otherwise valid workload—either avoid returning an error for corrected values or handle this case as a non-fatal warning at the call site.
## Individual Comments
### Comment 1
<location path="sensor/kubernetes/fake/fake.go" line_range="318-320" />
<code_context>
}
func validateWorkload(workload *Workload) error {
+ if workload.CentralConnectionCrashCycle < 0 {
+ workload.CentralConnectionCrashCycle = 0
+ return errors.New("negative centralConnectionCrashCycle, clamped to 0")
+ }
if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 {
</code_context>
<issue_to_address>
**issue (bug_risk):** Clamping `CentralConnectionCrashCycle` to 0 while still returning an error may be inconsistent with the validation pattern.
Elsewhere in this function (e.g. `OpenPortReuseProbability`), invalid values are only clamped and do not prevent workload loading. Here, a negative `centralConnectionCrashCycle` both clamps and fails. Please either (a) treat this like the other fields (clamp + log/record the issue, no error) or (b) remove the clamping and fail fast, so callers aren’t misled into thinking the original value was accepted.
</issue_to_address>
### Comment 2
<location path="sensor/kubernetes/fake/service.go" line_range="15-18" />
<code_context>
var (
protocols = [...]string{"TCP", "UDP", "SCTP"}
ipFamilies = [...]string{"IPv4", "IPv6"}
+ nextNodePort int32 = 30000
)
</code_context>
<issue_to_address>
**issue (bug_risk):** Global `nextNodePort` counter is not concurrency-safe and could generate duplicate ports under concurrent use.
`getUniqueNodePort` increments the package-level `nextNodePort` without any synchronization, so concurrent calls could race and produce duplicate ports. If concurrent use is expected, please guard `nextNodePort` with a mutex or `atomic.AddInt32`. If it’s guaranteed to be single-threaded, document that assumption near this helper to prevent future misuse.
</issue_to_address>
### Comment 3
<location path="tools/local-sensor/main.go" line_range="482" />
<code_context>
}
-func setupCentralWithFakeConnection(localConfig localSensorConfig) (centralclient.CentralConnectionFactory, centralclient.CertLoader, *centralDebug.FakeService) {
+func createFakeCentralService(localConfig localSensorConfig) *centralDebug.FakeService {
utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the fake-central setup into a single helper and wrapping the crash-cycle behavior in a small type to make main’s wiring clearer and reduce parameter sprawl.
One concrete way to reduce the complexity without changing behavior is to (1) collapse the fake‑central wiring into a single cohesive helper, and (2) encapsulate the crash‑cycle logic into a small type instead of a free function with many parameters.
### 1. Collapse fake‑central wiring into one helper
Right now the flow is split across `createFakeCentralService`, `startFakeCentralServer`, and `setupCentralWithFakeConnection`, and you also return both `centralclient.CentralConnectionFactory` and `centralDebug.FakeGRPCFactory` (which are currently the same object).
You can keep all behavior but simplify the call graph and parameter threading by collapsing the setup into a single helper that returns the fake factory (which also implements `CentralConnectionFactory`) plus the spy:
```go
// keeps env setup + initial messages + server start in one place
func setupCentralWithFakeConnection(localConfig localSensorConfig) (
centralDebug.FakeGRPCFactory, // also satisfies CentralConnectionFactory
centralclient.CertLoader,
*centralDebug.FakeService,
) {
utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_CA_FILE", "tools/local-sensor/certs/caCert.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_CA_KEY_FILE", "tools/local-sensor/certs/caKey.pem"))
// existing policy loading + initialMessages setup...
fakeCentral := centralDebug.MakeFakeCentralWithInitialMessages(initialMessages...)
if localConfig.Verbose {
fakeCentral.OnMessage(func(msg *central.MsgFromSensor) {
log.Printf("MESSAGE RECEIVED: %s\n", msg.String())
})
}
fakeCentral.SetMessageRecording(!localConfig.SkipCentralOutput)
conn, spyCentral, shutdownFakeServer := createConnectionAndStartServer(fakeCentral)
fakeCentral.OnShutdown(shutdownFakeServer)
fakeFactory := centralDebug.MakeFakeConnectionFactory(conn)
return fakeFactory, centralclient.EmptyCertLoader(), spyCentral
}
```
Then in `main` you only need one factory variable, and it’s clear that fake and real paths both provide the same abstraction:
```go
var connection centralclient.CentralConnectionFactory
var fakeFactory centralDebug.FakeGRPCFactory
var certLoader centralclient.CertLoader
var spyCentral *centralDebug.FakeService
if isFakeCentral {
fakeFactory, certLoader, spyCentral = setupCentralWithFakeConnection(localConfig)
connection = fakeFactory
defer spyCentral.Stop()
} else {
connection, certLoader = setupCentralWithRealConnection(k8sClient, localConfig)
}
```
This removes `createFakeCentralService` and `startFakeCentralServer` and the “double factory” signature.
You can keep a small internal helper for server restarts used by the crash cycle if you like, but it’s clearer if it’s narrowly scoped:
```go
func restartFakeCentralServer(localConfig localSensorConfig) (*centralDebug.FakeService, *grpc.ClientConn) {
// minimal subset: construct fakeCentral + start server
// re-use pieces of setupCentralWithFakeConnection as needed
}
```
### 2. Encapsulate crash‑cycle logic in a small type
`startCentralCrashCycle` currently takes many parameters and has fairly dense goroutine logic. You can keep the behavior but move the lifecycle details into a small type so `main` only orchestrates:
```go
type centralCrashCycler struct {
ctx context.Context
localConfig localSensorConfig
factory centralDebug.FakeGRPCFactory
initial *centralDebug.FakeService
}
func newCentralCrashCycler(
ctx context.Context,
localConfig localSensorConfig,
factory centralDebug.FakeGRPCFactory,
initial *centralDebug.FakeService,
) *centralCrashCycler {
return ¢ralCrashCycler{
ctx: ctx,
localConfig: localConfig,
factory: factory,
initial: initial,
}
}
func (c *centralCrashCycler) Start(cycle time.Duration, sensor *commonSensor.Sensor) {
if cycle <= 0 || sensor == nil {
return
}
log.Printf("Enabled Central connection crash cycle every %s", cycle)
go func() {
ticker := time.NewTicker(cycle)
defer ticker.Stop()
currentCentral := c.initial
for {
select {
case <-c.ctx.Done():
if currentCentral != nil {
currentCentral.Stop()
}
return
case <-ticker.C:
log.Printf("Triggering Central connection crash cycle")
if c.factory != nil && currentCentral != nil {
replacementCentral, replacementConn := restartFakeCentralServer(c.localConfig)
c.factory.OverwriteCentralConnection(replacementConn)
prev := currentCentral
currentCentral = replacementCentral
if !prev.KillSwitch.IsDone() {
prev.KillSwitch.Signal()
}
prev.Stop()
continue
}
sensor.RequestCentralConnectionRestart("local-sensor centralConnectionCrashCycle")
}
}
}()
}
```
`main` then becomes simpler and less coupled:
```go
if workloadManager != nil {
cycler := newCentralCrashCycler(ctx, localConfig, fakeFactory, spyCentral)
cycler.Start(workloadManager.CentralConnectionCrashCycle(), s)
}
```
This keeps all the crash‑cycle behavior (including the fake‑central rotation semantics) but:
- Removes the wide parameter list from `startCentralCrashCycle`.
- Keeps `main` focused on wiring and configuration, not ticker/kill‑switch orchestration.
- Aligns the fake‑central abstractions so you don’t have to juggle two parallel “factory” parameters through `main`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if workload.CentralConnectionCrashCycle < 0 { | ||
| workload.CentralConnectionCrashCycle = 0 | ||
| return errors.New("negative centralConnectionCrashCycle, clamped to 0") |
There was a problem hiding this comment.
issue (bug_risk): Clamping CentralConnectionCrashCycle to 0 while still returning an error may be inconsistent with the validation pattern.
Elsewhere in this function (e.g. OpenPortReuseProbability), invalid values are only clamped and do not prevent workload loading. Here, a negative centralConnectionCrashCycle both clamps and fails. Please either (a) treat this like the other fields (clamp + log/record the issue, no error) or (b) remove the clamping and fail fast, so callers aren’t misled into thinking the original value was accepted.
| nextNodePort int32 = 30000 | ||
| ) | ||
|
|
||
| func getRandProtocol() string { |
There was a problem hiding this comment.
issue (bug_risk): Global nextNodePort counter is not concurrency-safe and could generate duplicate ports under concurrent use.
getUniqueNodePort increments the package-level nextNodePort without any synchronization, so concurrent calls could race and produce duplicate ports. If concurrent use is expected, please guard nextNodePort with a mutex or atomic.AddInt32. If it’s guaranteed to be single-threaded, document that assumption near this helper to prevent future misuse.
| } | ||
|
|
||
| func setupCentralWithFakeConnection(localConfig localSensorConfig) (centralclient.CentralConnectionFactory, centralclient.CertLoader, *centralDebug.FakeService) { | ||
| func createFakeCentralService(localConfig localSensorConfig) *centralDebug.FakeService { |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the fake-central setup into a single helper and wrapping the crash-cycle behavior in a small type to make main’s wiring clearer and reduce parameter sprawl.
One concrete way to reduce the complexity without changing behavior is to (1) collapse the fake‑central wiring into a single cohesive helper, and (2) encapsulate the crash‑cycle logic into a small type instead of a free function with many parameters.
1. Collapse fake‑central wiring into one helper
Right now the flow is split across createFakeCentralService, startFakeCentralServer, and setupCentralWithFakeConnection, and you also return both centralclient.CentralConnectionFactory and centralDebug.FakeGRPCFactory (which are currently the same object).
You can keep all behavior but simplify the call graph and parameter threading by collapsing the setup into a single helper that returns the fake factory (which also implements CentralConnectionFactory) plus the spy:
// keeps env setup + initial messages + server start in one place
func setupCentralWithFakeConnection(localConfig localSensorConfig) (
centralDebug.FakeGRPCFactory, // also satisfies CentralConnectionFactory
centralclient.CertLoader,
*centralDebug.FakeService,
) {
utils.CrashOnError(os.Setenv("ROX_MTLS_CERT_FILE", "tools/local-sensor/certs/cert.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_KEY_FILE", "tools/local-sensor/certs/key.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_CA_FILE", "tools/local-sensor/certs/caCert.pem"))
utils.CrashOnError(os.Setenv("ROX_MTLS_CA_KEY_FILE", "tools/local-sensor/certs/caKey.pem"))
// existing policy loading + initialMessages setup...
fakeCentral := centralDebug.MakeFakeCentralWithInitialMessages(initialMessages...)
if localConfig.Verbose {
fakeCentral.OnMessage(func(msg *central.MsgFromSensor) {
log.Printf("MESSAGE RECEIVED: %s\n", msg.String())
})
}
fakeCentral.SetMessageRecording(!localConfig.SkipCentralOutput)
conn, spyCentral, shutdownFakeServer := createConnectionAndStartServer(fakeCentral)
fakeCentral.OnShutdown(shutdownFakeServer)
fakeFactory := centralDebug.MakeFakeConnectionFactory(conn)
return fakeFactory, centralclient.EmptyCertLoader(), spyCentral
}Then in main you only need one factory variable, and it’s clear that fake and real paths both provide the same abstraction:
var connection centralclient.CentralConnectionFactory
var fakeFactory centralDebug.FakeGRPCFactory
var certLoader centralclient.CertLoader
var spyCentral *centralDebug.FakeService
if isFakeCentral {
fakeFactory, certLoader, spyCentral = setupCentralWithFakeConnection(localConfig)
connection = fakeFactory
defer spyCentral.Stop()
} else {
connection, certLoader = setupCentralWithRealConnection(k8sClient, localConfig)
}This removes createFakeCentralService and startFakeCentralServer and the “double factory” signature.
You can keep a small internal helper for server restarts used by the crash cycle if you like, but it’s clearer if it’s narrowly scoped:
func restartFakeCentralServer(localConfig localSensorConfig) (*centralDebug.FakeService, *grpc.ClientConn) {
// minimal subset: construct fakeCentral + start server
// re-use pieces of setupCentralWithFakeConnection as needed
}2. Encapsulate crash‑cycle logic in a small type
startCentralCrashCycle currently takes many parameters and has fairly dense goroutine logic. You can keep the behavior but move the lifecycle details into a small type so main only orchestrates:
type centralCrashCycler struct {
ctx context.Context
localConfig localSensorConfig
factory centralDebug.FakeGRPCFactory
initial *centralDebug.FakeService
}
func newCentralCrashCycler(
ctx context.Context,
localConfig localSensorConfig,
factory centralDebug.FakeGRPCFactory,
initial *centralDebug.FakeService,
) *centralCrashCycler {
return ¢ralCrashCycler{
ctx: ctx,
localConfig: localConfig,
factory: factory,
initial: initial,
}
}
func (c *centralCrashCycler) Start(cycle time.Duration, sensor *commonSensor.Sensor) {
if cycle <= 0 || sensor == nil {
return
}
log.Printf("Enabled Central connection crash cycle every %s", cycle)
go func() {
ticker := time.NewTicker(cycle)
defer ticker.Stop()
currentCentral := c.initial
for {
select {
case <-c.ctx.Done():
if currentCentral != nil {
currentCentral.Stop()
}
return
case <-ticker.C:
log.Printf("Triggering Central connection crash cycle")
if c.factory != nil && currentCentral != nil {
replacementCentral, replacementConn := restartFakeCentralServer(c.localConfig)
c.factory.OverwriteCentralConnection(replacementConn)
prev := currentCentral
currentCentral = replacementCentral
if !prev.KillSwitch.IsDone() {
prev.KillSwitch.Signal()
}
prev.Stop()
continue
}
sensor.RequestCentralConnectionRestart("local-sensor centralConnectionCrashCycle")
}
}
}()
}main then becomes simpler and less coupled:
if workloadManager != nil {
cycler := newCentralCrashCycler(ctx, localConfig, fakeFactory, spyCentral)
cycler.Start(workloadManager.CentralConnectionCrashCycle(), s)
}This keeps all the crash‑cycle behavior (including the fake‑central rotation semantics) but:
- Removes the wide parameter list from
startCentralCrashCycle. - Keeps
mainfocused on wiring and configuration, not ticker/kill‑switch orchestration. - Aligns the fake‑central abstractions so you don’t have to juggle two parallel “factory” parameters through
main.
|
Images are ready for the commit at 14f86c5. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #19549 +/- ##
==========================================
- Coverage 49.26% 49.25% -0.01%
==========================================
Files 2727 2727
Lines 205788 205807 +19
==========================================
- Hits 101383 101380 -3
- Misses 96874 96894 +20
- Partials 7531 7533 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
TODO - Draft code as a PoC - need to review and refactor.
Reproduction of ROX-33474
Run fake workloads with multiple nodes having node ports:
Result (after ~2 minutes):
Mitigation: Disable history
Result: workload runs for the full 15 minutes. Expected to run forever if the
-durationis omitted.User-facing documentation
Testing and quality
Automated testing
How I validated my change
change me!