Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .superpowers/brainstorm/187997-1773926811/.server-stopped
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"reason":"idle timeout","timestamp":1773995694913}
4 changes: 4 additions & 0 deletions .superpowers/brainstorm/187997-1773926811/.server.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type":"server-started","port":53648,"host":"127.0.0.1","url_host":"localhost","url":"http://localhost:53648","screen_dir":"/home/ebenshet/go/src/github.com/stackrox/stackrox/.superpowers/brainstorm/187997-1773926811"}
{"type":"screen-added","file":"/home/ebenshet/go/src/github.com/stackrox/stackrox/.superpowers/brainstorm/187997-1773926811/code-patterns.html"}
{"type":"screen-added","file":"/home/ebenshet/go/src/github.com/stackrox/stackrox/.superpowers/brainstorm/187997-1773926811/architecture-map.html"}
{"type":"server-stopped","reason":"idle timeout"}
1 change: 1 addition & 0 deletions .superpowers/brainstorm/187997-1773926811/.server.pid
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
188005
475 changes: 475 additions & 0 deletions .superpowers/brainstorm/187997-1773926811/architecture-map.html

Large diffs are not rendered by default.

189 changes: 189 additions & 0 deletions .superpowers/brainstorm/187997-1773926811/code-patterns.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
<h2>Central's Three Module Archetypes - Real Code Examples</h2>
<p class="subtitle">Each archetype has different concurrency patterns, different metric needs, and different levels of existing instrumentation</p>

<div class="section">
<h3>Archetype 1: Sensor Message Pipeline (Worker Queue)</h3>
<div class="mockup">
<div class="mockup-header">central/sensor/service/connection/worker_queue.go</div>
<div class="mockup-body" style="font-family: monospace; font-size: 13px; white-space: pre; overflow-x: auto; background: #1e1e2e; color: #cdd6f4; padding: 16px; line-height: 1.5;">
<span style="color:#89b4fa">// Hash-sharded deduping queues (poolSize+1 workers)</span>
type workerQueue struct {
poolSize int
queues []*dedupingqueue.DedupingQueue[string] <span style="color:#a6e3a1">// each queue = 1 goroutine</span>
}

<span style="color:#89b4fa">// Push: FNV hash routes msg to a specific queue shard</span>
func (w *workerQueue) push(msg *central.MsgFromSensor) {
idx := w.indexFromKey(msg.GetHashKey())
w.queues[idx].Push(msg) <span style="color:#f38ba8">// NO depth gauge, NO push timing</span>
}

<span style="color:#89b4fa">// Workers: pull blocking, handle, retry on transient errors</span>
func (w *workerQueue) runWorker(...) {
for msg := queue.PullBlocking(stopSig); msg != nil; ... {
err := handler(ctx, msgFromSensor)
if pgutils.IsTransientError(err) {
<span style="color:#f38ba8">// Retry with backoff - NO retry count metric</span>
concurrency.AfterFunc(reprocessingDuration, ...)
}
}
}</div>
</div>
<div class="pros-cons">
<div class="pros">
<h4>Existing Metrics</h4>
<ul>
<li><code>sensor_event_queue</code> - CounterVec (Add/Remove/Dedupe ops)</li>
<li><code>sensor_event_duration</code> - Histogram (processing time per type)</li>
<li><code>sensor_event_deduper</code> - Counter (passed/deduped)</li>
<li><code>pipeline_panics</code> - Counter per resource type</li>
</ul>
</div>
<div class="cons">
<h4>Missing Metrics</h4>
<ul>
<li>Queue depth gauge (how deep is each shard?)</li>
<li>Worker utilization (busy vs idle time)</li>
<li>Retry count / permanent failure count</li>
<li>Per-shard processing rate</li>
<li>Items in-flight (being processed right now)</li>
<li>End-to-end latency (enqueue to completion)</li>
</ul>
</div>
</div>
</div>

<div class="section">
<h3>Archetype 2: Background Ticker Loop (Reprocessor)</h3>
<div class="mockup">
<div class="mockup-header">central/reprocessor/reprocessor.go</div>
<div class="mockup-body" style="font-family: monospace; font-size: 13px; white-space: pre; overflow-x: auto; background: #1e1e2e; color: #cdd6f4; padding: 16px; line-height: 1.5;">
<span style="color:#89b4fa">// Three independent ticker loops, each in its own goroutine</span>
func (l *loopImpl) Start() {
l.enrichAndDetectTicker = time.NewTicker(4 * time.Hour)
l.deploymentRiskTicker = time.NewTicker(riskInterval)
l.activeComponentTicker = time.NewTicker(activeInterval)
go l.riskLoop()
go l.enrichLoop()
go l.activeComponentLoop()
}

<span style="color:#89b4fa">// enrichLoop: select on stop, short-circuit, signature, ticker</span>
func (l *loopImpl) enrichLoop() {
for !l.stopSig.IsDone() {
select {
case &lt;-l.enrichAndDetectTicker.C:
l.runReprocessing(ForceRefetchCachedValuesOnly)
<span style="color:#f38ba8">// Only tracks total duration, not per-entity</span>
}
}
}

<span style="color:#89b4fa">// Semaphore-limited concurrent processing (5 goroutines max)</span>
func (l *loopImpl) runReprocessingForObjects(...) {
sema := semaphore.NewWeighted(5)
for _, id := range ids {
go func(id string) {
defer sema.Release(1)
individualReprocessFunc(id) <span style="color:#f38ba8">// NO per-item timing</span>
}(id)
}
}</div>
</div>
<div class="pros-cons">
<div class="pros">
<h4>Existing Metrics</h4>
<ul>
<li><code>reprocessor_duration_seconds</code> - Gauge (total loop time)</li>
<li><code>signature_verification_reprocessor_duration_seconds</code> - Gauge</li>
<li><code>risk_processing_duration</code> - Histogram</li>
</ul>
</div>
<div class="cons">
<h4>Missing Metrics</h4>
<ul>
<li>Is the loop currently running? (boolean gauge)</li>
<li>How many items in current batch?</li>
<li>Items processed / failed per run</li>
<li>Semaphore utilization (how many of 5 slots used?)</li>
<li>Time since last successful run</li>
<li>Individual entity processing duration histogram</li>
<li>Number of runs completed (counter)</li>
</ul>
</div>
</div>
</div>

<div class="section">
<h3>Archetype 3: Enrichment/Detection (Request-Driven Processing)</h3>
<div class="mockup">
<div class="mockup-header">central/enrichment/enricher_impl.go + central/detection/lifecycle/manager_impl.go</div>
<div class="mockup-body" style="font-family: monospace; font-size: 13px; white-space: pre; overflow-x: auto; background: #1e1e2e; color: #cdd6f4; padding: 16px; line-height: 1.5;">
<span style="color:#89b4fa">// Enrichment: called inline from pipeline AND from reprocessor</span>
<span style="color:#89b4fa">// Two different call paths, same underlying function</span>

<span style="color:#89b4fa">// Path 1: Pipeline fragment triggers enrichment synchronously</span>
func (p *pipelineImpl) Run(ctx, clusterID, msg) {
deployment := msg.GetEvent().GetDeployment()
images, _, _ := e.enricher.EnrichDeployment(ctx, deployment)
<span style="color:#f38ba8">// NO metrics on enrichment call count or success rate</span>
detector.ProcessDeployment(ctx, deployment, images)
<span style="color:#f38ba8">// NO metrics on detection outcome counts</span>
}

<span style="color:#89b4fa">// Path 2: Reprocessor triggers enrichment in background</span>
func (l *loopImpl) reprocessImage(id string, ...) {
result, err := reprocessingFunc(ctx, enrichCtx, image)
if result.ImageUpdated {
l.risk.CalculateRiskAndUpsertImage(image)
}
<span style="color:#f38ba8">// Only counts nReprocessed at batch level</span>
}

<span style="color:#89b4fa">// Image scan semaphore limits concurrent scanner calls</span>
internalScanSemaphore = semaphore.NewWeighted(maxParallelScans)
<span style="color:#a6e3a1">// This one HAS metrics: rox_image_scan_semaphore_*</span></div>
</div>
<div class="pros-cons">
<div class="pros">
<h4>Existing Metrics</h4>
<ul>
<li><code>image_scan_semaphore_holding_size</code> - active scans</li>
<li><code>image_scan_semaphore_queue_size</code> - queued scans</li>
<li><code>deployment_enhancement_duration_ms</code> - round trip time</li>
<li><code>resource_processed_count</code> - per resource type</li>
</ul>
</div>
<div class="cons">
<h4>Missing Metrics</h4>
<ul>
<li>Enrichment call count (by trigger: pipeline vs reprocessor)</li>
<li>Enrichment success/failure/skip rate</li>
<li>Detection: alerts generated per policy evaluation</li>
<li>Detection: violations found vs clean deployments</li>
<li>Risk calculation duration per entity type</li>
<li>Cache hit rate for enrichment data</li>
</ul>
</div>
</div>
</div>

<div class="section" style="margin-top: 32px; padding: 20px; border: 2px solid #89b4fa; border-radius: 8px;">
<h3>Key Insight: The DedupingQueue Already Has Metric Hooks</h3>
<p>Both <code>pkg/queue/Queue[T]</code> and <code>pkg/dedupingqueue/DedupingQueue[K]</code> already accept optional Prometheus metrics via functional options:</p>
<div style="font-family: monospace; font-size: 13px; background: #1e1e2e; color: #cdd6f4; padding: 16px; border-radius: 4px; white-space: pre; overflow-x: auto;">
<span style="color:#a6e3a1">// Queue[T] accepts:</span>
WithCounterVec[T](vec *prometheus.CounterVec) <span style="color:#89b4fa">// tracks Add/Remove</span>
WithDroppedMetric[T](metric prometheus.Counter) <span style="color:#89b4fa">// tracks drops when full</span>

<span style="color:#a6e3a1">// DedupingQueue[K] accepts:</span>
WithSizeMetrics[K](metric prometheus.Gauge) <span style="color:#89b4fa">// tracks queue depth</span>
WithOperationMetricsFunc[K](fn func(ops.Op, string)) <span style="color:#89b4fa">// tracks Add/Remove/Dedupe</span>

<span style="color:#f9e2af">// But NO built-in support for:</span>
<span style="color:#f9e2af">// - Processing duration histograms</span>
<span style="color:#f9e2af">// - In-flight item counts</span>
<span style="color:#f9e2af">// - Error/retry counters</span>
<span style="color:#f9e2af">// - Worker utilization gauges</span></div>
</div>
</div>
Loading