Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 109 additions & 26 deletions sensor/kubernetes/fake/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
)

var (
ipPool = newPool()
containerPool = newPool()
endpointPool = newEndpointPool()
ipPool = newPool()
externalIpPool = newPool()
containerPool = newPool()
endpointPool = newEndpointPool()

registeredHostConnections []manager.HostNetworkInfo
)
Expand Down Expand Up @@ -120,6 +121,30 @@
return fmt.Sprintf("10.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256))
}

// Generate IP addresses from 11.0.0.0 to 99.255.255.255 which are all public
func generateExternalIP() string {
return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(89)+11, rand.Intn(256), rand.Intn(256), rand.Intn(256))
}

// We want to reuse some external IPs, so we test the cases where multiple
// entities connect to the same external IP, but we also want many external IPs
// that are only used once.
func generateExternalIPPool() {
ip := []int{11, 0, 0, 0}
for range 1000 {
for j := 3; j >= 0; j-- {
ip[j]++
if ip[j] > 255 {
ip[j] = 0
} else {
break
}
}
ipString := fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
externalIpPool.add(ipString)
}
}

func generateAndAddIPToPool() string {
ip := generateIP()
for !ipPool.add(ip) {
Expand Down Expand Up @@ -176,17 +201,6 @@
return getNetworkProcessUniqueKeyFromProcess(process)
}

func getNetworkEndpointFromConnectionAndOriginator(conn *sensor.NetworkConnection, originator *storage.NetworkProcessUniqueKey) *sensor.NetworkEndpoint {
return &sensor.NetworkEndpoint{
SocketFamily: conn.SocketFamily,
Protocol: conn.Protocol,
ListenAddress: conn.LocalAddress,
ContainerId: conn.ContainerId,
CloseTimestamp: nil,
Originator: originator,
}
}

func makeNetworkConnection(src string, dst string, containerID string, closeTimestamp time.Time) *sensor.NetworkConnection {
closeTS, err := protocompat.ConvertTimeToTimestampOrError(closeTimestamp)
if err != nil {
Expand All @@ -210,32 +224,99 @@
}
}

// Randomly decide to get an interal or external IP, with an 80% chance of the IP
// being internal and 20% of being external. If the IP is external randomly decide
// to pick it from a pool of external IPs or a new external IP, with a 50/50 chance
// of being from the pool or a newly generated IP address. We want to have cases
// where multiple different entities connect to the same external IP, but we also
// want a large number of unique external IPs.
func (w *WorkloadManager) getRandomInternalExternalIP() (string, bool, bool) {
ip := ""
var ok bool

internal := rand.Intn(100) < 80
if internal {
ip, ok = ipPool.randomElem()
} else {
if rand.Intn(100) < 50 {
ip, ok = externalIpPool.randomElem()
} else {
ip = generateExternalIP()
ok = true
}
}

if !ok {
log.Errorf("Found no IPs in the %s pool", map[bool]string{true: "internal", false: "external"}[internal])
}

return ip, internal, ok
}

func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) {
src, internal, ok := w.getRandomInternalExternalIP()
if !ok {
return "", "", false
}
var dst string
// If the src is internal, the dst can be internal or external, but
// if the src is external, the dst must be internal.
if internal {
dst, _, ok = w.getRandomInternalExternalIP()
} else {
dst, ok = ipPool.randomElem()
if !ok {
log.Error("Found no IPs in the internal pool")
}

Check warning on line 270 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L269-L270

Added lines #L269 - L270 were not covered by tests
}

return src, dst, ok
}

func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) {
originator := getRandomOriginator(containerID)

ip, ok := ipPool.randomElem()
if !ok {
return nil, false
}

Check warning on line 282 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L276-L282

Added lines #L276 - L282 were not covered by tests

networkEndpoint := &sensor.NetworkEndpoint{
SocketFamily: sensor.SocketFamily_SOCKET_FAMILY_IPV4,
Protocol: storage.L4Protocol_L4_PROTOCOL_TCP,
ListenAddress: &sensor.NetworkAddress{
AddressData: net.ParseIP(ip).AsNetIP(),
Port: rand.Uint32() % 63556,
},
ContainerId: containerID,
CloseTimestamp: nil,
Originator: originator,
}

return networkEndpoint, ok

Check warning on line 296 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L284-L296

Added lines #L284 - L296 were not covered by tests
}

func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) *sensor.NetworkConnectionInfo {
conns := make([]*sensor.NetworkConnection, 0, workload.BatchSize)
networkEndpoints := make([]*sensor.NetworkEndpoint, 0, workload.BatchSize)
for i := 0; i < workload.BatchSize; i++ {
src, ok := ipPool.randomElem()
if !ok {
log.Error("found no IPs in pool")
continue
}
dst, ok := ipPool.randomElem()
src, dst, ok := w.getRandomSrcDst()

Check warning on line 303 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L303

Added line #L303 was not covered by tests
if !ok {
log.Error("found no IPs in pool")
continue
}

containerID, ok := containerPool.randomElem()
if !ok {
log.Error("found no containers in pool")
log.Error("Found no containers in pool")

Check warning on line 310 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L310

Added line #L310 was not covered by tests
continue
}

conn := makeNetworkConnection(src, dst, containerID, time.Now().Add(-5*time.Second))

originator := getRandomOriginator(containerID)

networkEndpoint := getNetworkEndpointFromConnectionAndOriginator(conn, originator)
networkEndpoint, ok := w.getRandomNetworkEndpoint(containerID)
if !ok {
log.Error("Found no IPs in the internal pool")
continue

Check warning on line 318 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L315-L318

Added lines #L315 - L318 were not covered by tests
}

conns = append(conns, conn)
if endpointPool.Size < endpointPool.Capacity {
Expand Down Expand Up @@ -273,6 +354,8 @@
ticker := time.NewTicker(workload.FlowInterval)
defer ticker.Stop()

generateExternalIPPool()

Check warning on line 358 in sensor/kubernetes/fake/flows.go

View check run for this annotation

Codecov / codecov/patch

sensor/kubernetes/fake/flows.go#L357-L358

Added lines #L357 - L358 were not covered by tests
for {
select {
case <-ctx.Done():
Expand Down
42 changes: 42 additions & 0 deletions sensor/kubernetes/fake/flows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fake

import (
"testing"

"github.com/stackrox/rox/pkg/net"
"github.com/stretchr/testify/suite"
)

type flowsSuite struct {
suite.Suite
}

func TestFlowsSuite(t *testing.T) {
suite.Run(t, new(flowsSuite))
}

func (s *flowsSuite) TestGetRandomInternalExternalIP() {
var w WorkloadManager

_, _, ok := w.getRandomSrcDst()
s.False(ok)

for range 1000 {
generateAndAddIPToPool()
}

generateExternalIPPool()

for range 1000 {
ip, internal, ok := w.getRandomInternalExternalIP()
s.True(ok)
s.Equal(internal, !net.ParseIP(ip).IsPublic())
}

for range 1000 {
src, dst, ok := w.getRandomSrcDst()
// At least one has to be internal
s.True(!net.ParseIP(src).IsPublic() || !net.ParseIP(dst).IsPublic())
s.True(ok)
}
}
Loading