From c3d86a7ea01664e89f207cd915722ed7a3e5c3b9 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 10 Jul 2025 09:50:53 +0200 Subject: [PATCH 1/3] wip: first test --- .../30-install-airflow-cluster.yaml.j2 | 63 +++++++++++++++++++ .../mount-dags-configmap/60-assert.yaml.j2 | 12 ---- .../60-install-metrics-script.yaml | 8 --- 3 files changed, 63 insertions(+), 20 deletions(-) delete mode 100644 tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/mount-dags-configmap/60-install-metrics-script.yaml diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index 324c0642..1052203a 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -57,6 +57,39 @@ data: bash_command='echo "Here is the message: $message"', env={'message': '{% raw %}{{ dag_run.conf.get("message") }}{% endraw %}'}, ) + dag_factory.py: | + from airflow import DAG + from airflow.operators.empty import EmptyOperator + from datetime import datetime, timedelta + + # Number of DAGs to generate + NUM_DAGS = 1200 # Increase for more stress + DAG_PREFIX = "stress_dag_" + + default_args = { + 'owner': 'airflow', + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(seconds=5), + } + + def create_dag(dag_id): + with DAG( + dag_id=dag_id, + default_args=default_args, + schedule=None, + catchup=False, + tags=["stress_test"], + ) as dag: + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') + start >> end + return dag + + for i in range(NUM_DAGS): + dag_id = f"{DAG_PREFIX}{i:04d}" + globals()[dag_id] = create_dag(dag_id) + --- apiVersion: airflow.stackable.tech/v1alpha1 kind: AirflowCluster @@ -84,12 +117,40 @@ spec: - name: test-cm-dag mountPath: /dags/example_trigger_target_dag.py subPath: example_trigger_target_dag.py + - name: test-cm-dag + mountPath: /dags/dag_factory.py + subPath: dag_factory.py webservers: roleConfig: listenerClass: external-unstable config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: &logging + airflow: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG + git-sync: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG + vector: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG roleGroups: default: envOverrides: @@ -100,6 +161,7 @@ spec: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: *logging roleGroups: default: envOverrides: @@ -118,6 +180,7 @@ spec: gracefulShutdownTimeout: 10s logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: *logging roleGroups: default: envOverrides: diff --git a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 deleted file mode 100644 index 7f43f061..00000000 --- a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 +++ /dev/null @@ -1,12 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 480 -commands: -{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" -{% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" -{% endif %} diff --git a/tests/templates/kuttl/mount-dags-configmap/60-install-metrics-script.yaml b/tests/templates/kuttl/mount-dags-configmap/60-install-metrics-script.yaml deleted file mode 100644 index 17136688..00000000 --- a/tests/templates/kuttl/mount-dags-configmap/60-install-metrics-script.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -metadata: - name: metrics -commands: - - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/metrics.py test-airflow-python-0:/tmp - timeout: 240 From 7f37dd2176939f709922645c3c2ccf659ce54314 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 11 Jul 2025 15:42:05 +0200 Subject: [PATCH 2/3] added xcom test dag --- .../kuttl/mount-dags-configmap/06-cr-crb.yaml | 28 ++++++ .../30-install-airflow-cluster.yaml.j2 | 85 +++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 tests/templates/kuttl/mount-dags-configmap/06-cr-crb.yaml diff --git a/tests/templates/kuttl/mount-dags-configmap/06-cr-crb.yaml b/tests/templates/kuttl/mount-dags-configmap/06-cr-crb.yaml new file mode 100644 index 00000000..ca99c54c --- /dev/null +++ b/tests/templates/kuttl/mount-dags-configmap/06-cr-crb.yaml @@ -0,0 +1,28 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: airflow-xcom-clusterrole +rules: +- apiGroups: + - "" + resources: + - pods/exec + verbs: + - create + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: airflow-xcom-clusterrole-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: airflow-xcom-clusterrole +subjects: +- apiGroup: rbac.authorization.k8s.io + kind: Group + name: system:serviceaccounts diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index 1052203a..d9c84aee 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -90,6 +90,88 @@ data: dag_id = f"{DAG_PREFIX}{i:04d}" globals()[dag_id] = create_dag(dag_id) + fork.py: | + from __future__ import annotations + import pendulum + from airflow.providers.cncf.kubernetes.operators.pod import ( + KubernetesPodOperator, + ) + from airflow.sdk import DAG, task + + @task.branch() + def should_run(**kwargs) -> str: + """ + Determine which empty_task should be run based on if the logical date minute is even or odd. + + :param dict kwargs: Context + :return: Id of the task to run + """ + print(f"------------- exec dttm = {kwargs['logical_date']} and minute = {kwargs['logical_date'].minute}") + if kwargs["logical_date"].minute % 2 == 0: + return "branch_task_3" + return "branch_task_4" + + + with DAG( + dag_id="branching_test", + schedule="*/1 * * * *", + start_date=pendulum.datetime(2025, 1, 1, tz="UTC"), + catchup=False, + default_args={"depends_on_past": True}, + tags=["example"], + ) as dag: + cond = should_run() + + task_1 = KubernetesPodOperator( + name=f"branch_task_1", + image="oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev", + cmds=["python"], + arguments=["-V"], + image_pull_policy="IfNotPresent", + task_id="branch_task_1", + do_xcom_push=True + ) + task_2 = KubernetesPodOperator( + name=f"branch_task_2", + image="oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev", + cmds=["python"], + arguments=["-V"], + image_pull_policy="IfNotPresent", + task_id="branch_task_2", + do_xcom_push=True + ) + task_3 = KubernetesPodOperator( + name=f"branch_task_3", + image="oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev", + cmds=["python"], + arguments=["-V"], + image_pull_policy="IfNotPresent", + task_id="branch_task_3", + do_xcom_push=True + ) + task_4 = KubernetesPodOperator( + name=f"branch_task_4", + image="oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev", + cmds=["python"], + arguments=["-V"], + image_pull_policy="IfNotPresent", + task_id="branch_task_4", + do_xcom_push=True + ) + task_5 = KubernetesPodOperator( + name=f"branch_task_5", + image="oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev", + cmds=["python"], + arguments=["-V"], + image_pull_policy="IfNotPresent", + task_id="branch_task_5", + do_xcom_push=True + ) + + task_1 >> task_2 >> cond + cond >> [task_3, task_4] + task_3 >> task_5 + --- apiVersion: airflow.stackable.tech/v1alpha1 kind: AirflowCluster @@ -120,6 +202,9 @@ spec: - name: test-cm-dag mountPath: /dags/dag_factory.py subPath: dag_factory.py + - name: test-cm-dag + mountPath: /dags/fork.py + subPath: fork.py webservers: roleConfig: listenerClass: external-unstable From 89702cdc73e0cc6cdcbc741deaba2912d23d5503 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 11 Jul 2025 17:41:36 +0200 Subject: [PATCH 3/3] add new db_init field + wip test --- deploy/helm/airflow-operator/crds/crds.yaml | 4 + .../operator-binary/src/airflow_controller.rs | 7 +- rust/operator-binary/src/crd/mod.rs | 46 ++++-- .../kuttl/db-init/00-patch-ns.yaml.j2 | 9 + tests/templates/kuttl/db-init/05-assert.yaml | 14 ++ .../kuttl/db-init/05-install-postgresql.yaml | 11 ++ tests/templates/kuttl/db-init/06-cr-crb.yaml | 28 ++++ .../templates/kuttl/db-init/10-assert.yaml.j2 | 24 +++ .../kuttl/db-init/10-install-redis.yaml.j2 | 13 ++ .../templates/kuttl/db-init/20-assert.yaml.j2 | 10 ++ ...tor-aggregator-discovery-configmap.yaml.j2 | 9 + .../templates/kuttl/db-init/30-assert.yaml.j2 | 32 ++++ .../30-install-airflow-cluster.yaml.j2 | 154 ++++++++++++++++++ tests/templates/kuttl/db-init/40-assert.yaml | 14 ++ .../db-init/40-install-airflow-python.yaml | 23 +++ .../templates/kuttl/db-init/50-assert.yaml.j2 | 12 ++ .../kuttl/db-init/50-health-check.yaml | 7 + .../templates/kuttl/db-init/70-assert.yaml.j2 | 32 ++++ ...install-airflow-cluster-no-db-init.yaml.j2 | 13 ++ .../helm-bitnami-postgresql-values.yaml.j2 | 24 +++ .../db-init/helm-bitnami-redis-values.yaml.j2 | 29 ++++ tests/test-definition.yaml | 5 + 22 files changed, 503 insertions(+), 17 deletions(-) create mode 100644 tests/templates/kuttl/db-init/00-patch-ns.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/05-assert.yaml create mode 100644 tests/templates/kuttl/db-init/05-install-postgresql.yaml create mode 100644 tests/templates/kuttl/db-init/06-cr-crb.yaml create mode 100644 tests/templates/kuttl/db-init/10-assert.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/10-install-redis.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/20-assert.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/20-install-vector-aggregator-discovery-configmap.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/30-assert.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/30-install-airflow-cluster.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/40-assert.yaml create mode 100644 tests/templates/kuttl/db-init/40-install-airflow-python.yaml create mode 100644 tests/templates/kuttl/db-init/50-assert.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/50-health-check.yaml create mode 100644 tests/templates/kuttl/db-init/70-assert.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/70-install-airflow-cluster-no-db-init.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/helm-bitnami-postgresql-values.yaml.j2 create mode 100644 tests/templates/kuttl/db-init/helm-bitnami-redis-values.yaml.j2 diff --git a/deploy/helm/airflow-operator/crds/crds.yaml b/deploy/helm/airflow-operator/crds/crds.yaml index b56fe56d..4c824bfd 100644 --- a/deploy/helm/airflow-operator/crds/crds.yaml +++ b/deploy/helm/airflow-operator/crds/crds.yaml @@ -591,6 +591,10 @@ spec: - repo type: object type: array + dbInit: + default: true + description: Whether to migrate/upgrade the database on start-up and add a named user. These operations are idempotent so can be executed by default. Defaults to true. + type: boolean exposeConfig: default: false description: for internal use only - not for production use. diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 9f933a35..633ab1d0 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -944,8 +944,11 @@ fn build_server_rolegroup_statefulset( .context(GracefulShutdownSnafu)?; let mut airflow_container_args = Vec::new(); - airflow_container_args - .extend(airflow_role.get_commands(authentication_config, resolved_product_image)); + airflow_container_args.extend(airflow_role.get_commands( + airflow, + authentication_config, + resolved_product_image, + )); airflow_container .image_from_product_image(resolved_product_image) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 046eb5c6..2c52749e 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -247,6 +247,10 @@ pub mod versioned { #[serde(default)] pub load_examples: bool, + /// Whether to migrate/upgrade the database on start-up and add a named user. These operations are idempotent so can be executed by default. Defaults to true. + #[serde(default = "default_true")] + pub db_init: bool, + /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). /// It must contain the key `ADDRESS` with the address of the Vector aggregator. /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) @@ -278,6 +282,10 @@ pub mod versioned { } } +fn default_true() -> bool { + true +} + impl Default for v1alpha1::WebserverRoleConfig { fn default() -> Self { v1alpha1::WebserverRoleConfig { @@ -543,6 +551,7 @@ impl AirflowRole { /// if authentication is enabled. pub fn get_commands( &self, + airflow: &v1alpha1::AirflowCluster, auth_config: &AirflowClientAuthenticationDetailsResolved, resolved_product_image: &ResolvedProductImage, ) -> Vec { @@ -572,21 +581,27 @@ impl AirflowRole { "airflow api-server &".to_string(), ]); } - AirflowRole::Scheduler => command.extend(vec![ - "airflow db migrate".to_string(), - "airflow users create \ - --username \"$ADMIN_USERNAME\" \ - --firstname \"$ADMIN_FIRSTNAME\" \ - --lastname \"$ADMIN_LASTNAME\" \ - --email \"$ADMIN_EMAIL\" \ - --password \"$ADMIN_PASSWORD\" \ - --role \"Admin\"" - .to_string(), - "prepare_signal_handlers".to_string(), - container_debug_command(), - "airflow dag-processor &".to_string(), - "airflow scheduler &".to_string(), - ]), + AirflowRole::Scheduler => { + if airflow.spec.cluster_config.db_init { + command.extend(vec![ + "airflow db migrate".to_string(), + "airflow users create \ + --username \"$ADMIN_USERNAME\" \ + --firstname \"$ADMIN_FIRSTNAME\" \ + --lastname \"$ADMIN_LASTNAME\" \ + --email \"$ADMIN_EMAIL\" \ + --password \"$ADMIN_PASSWORD\" \ + --role \"Admin\"" + .to_string(), + ]) + } + command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow dag-processor &".to_string(), + "airflow scheduler &".to_string(), + ]); + } AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), @@ -977,5 +992,6 @@ mod tests { assert_eq!("KubernetesExecutor", cluster.spec.executor.to_string()); assert!(cluster.spec.cluster_config.load_examples); assert!(cluster.spec.cluster_config.expose_config); + assert!(cluster.spec.cluster_config.db_init); } } diff --git a/tests/templates/kuttl/db-init/00-patch-ns.yaml.j2 b/tests/templates/kuttl/db-init/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/db-init/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/db-init/05-assert.yaml b/tests/templates/kuttl/db-init/05-assert.yaml new file mode 100644 index 00000000..319e927a --- /dev/null +++ b/tests/templates/kuttl/db-init/05-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-postgresql +timeout: 480 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-postgresql +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/db-init/05-install-postgresql.yaml b/tests/templates/kuttl/db-init/05-install-postgresql.yaml new file mode 100644 index 00000000..dc25ba20 --- /dev/null +++ b/tests/templates/kuttl/db-init/05-install-postgresql.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-postgresql + --namespace $NAMESPACE + --version 16.4.2 + -f helm-bitnami-postgresql-values.yaml + oci://registry-1.docker.io/bitnamicharts/postgresql + timeout: 600 diff --git a/tests/templates/kuttl/db-init/06-cr-crb.yaml b/tests/templates/kuttl/db-init/06-cr-crb.yaml new file mode 100644 index 00000000..ca99c54c --- /dev/null +++ b/tests/templates/kuttl/db-init/06-cr-crb.yaml @@ -0,0 +1,28 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: airflow-xcom-clusterrole +rules: +- apiGroups: + - "" + resources: + - pods/exec + verbs: + - create + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: airflow-xcom-clusterrole-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: airflow-xcom-clusterrole +subjects: +- apiGroup: rbac.authorization.k8s.io + kind: Group + name: system:serviceaccounts diff --git a/tests/templates/kuttl/db-init/10-assert.yaml.j2 b/tests/templates/kuttl/db-init/10-assert.yaml.j2 new file mode 100644 index 00000000..8d585401 --- /dev/null +++ b/tests/templates/kuttl/db-init/10-assert.yaml.j2 @@ -0,0 +1,24 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-redis +timeout: 360 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-master +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-replicas +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/db-init/10-install-redis.yaml.j2 b/tests/templates/kuttl/db-init/10-install-redis.yaml.j2 new file mode 100644 index 00000000..aae9a14f --- /dev/null +++ b/tests/templates/kuttl/db-init/10-install-redis.yaml.j2 @@ -0,0 +1,13 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-redis + --namespace $NAMESPACE + --version 17.11.3 + -f helm-bitnami-redis-values.yaml + --repo https://charts.bitnami.com/bitnami redis + timeout: 600 +{% endif %} diff --git a/tests/templates/kuttl/db-init/20-assert.yaml.j2 b/tests/templates/kuttl/db-init/20-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/db-init/20-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/db-init/20-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/db-init/20-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/db-init/20-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/db-init/30-assert.yaml.j2 b/tests/templates/kuttl/db-init/30-assert.yaml.j2 new file mode 100644 index 00000000..960fd21e --- /dev/null +++ b/tests/templates/kuttl/db-init/30-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-cluster +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-webserver-default +status: + readyReplicas: 1 + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +status: + readyReplicas: 2 + replicas: 2 +{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-scheduler-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/db-init/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/db-init/30-install-airflow-cluster.yaml.j2 new file mode 100644 index 00000000..30674395 --- /dev/null +++ b/tests/templates/kuttl/db-init/30-install-airflow-cluster.yaml.j2 @@ -0,0 +1,154 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-airflow-db +timeout: 480 +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-airflow-credentials +type: Opaque +stringData: + adminUser.username: airflow + adminUser.firstname: Airflow + adminUser.lastname: Admin + adminUser.email: airflow@airflow.com + adminUser.password: airflow + connections.secretKey: thisISaSECRET_1234 + connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow +{% if test_scenario['values']['executor'] == 'celery' %} + connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow + connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0 +{% endif %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-cm-dag +data: + dag_factory.py: | + from airflow import DAG + from airflow.operators.empty import EmptyOperator + from datetime import datetime, timedelta + + # Number of DAGs to generate + NUM_DAGS = 1200 # Increase for more stress + DAG_PREFIX = "stress_dag_" + + default_args = { + 'owner': 'airflow', + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(seconds=5), + } + + def create_dag(dag_id): + with DAG( + dag_id=dag_id, + default_args=default_args, + schedule=None, + catchup=False, + tags=["stress_test"], + ) as dag: + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') + start >> end + return dag + + for i in range(NUM_DAGS): + dag_id = f"{DAG_PREFIX}{i:04d}" + globals()[dag_id] = create_dag(dag_id) +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + image: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + credentialsSecret: test-airflow-credentials + volumes: + - name: test-cm-dag + configMap: + name: test-cm-dag + volumeMounts: + - name: test-cm-dag + mountPath: /dags/dag_factory.py + subPath: dag_factory.py + webservers: + roleConfig: + listenerClass: external-unstable + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: &logging + airflow: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG + git-sync: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG + vector: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG + roleGroups: + default: + envOverrides: + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} + celeryExecutors: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: *logging + roleGroups: + default: + envOverrides: + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + replicas: 2 +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + envOverrides: + AIRFLOW__CORE__DAGS_FOLDER: "/dags" +{% endif %} + schedulers: + config: + gracefulShutdownTimeout: 10s + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + containers: *logging + roleGroups: + default: + envOverrides: + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + replicas: 1 diff --git a/tests/templates/kuttl/db-init/40-assert.yaml b/tests/templates/kuttl/db-init/40-assert.yaml new file mode 100644 index 00000000..6edaa3c3 --- /dev/null +++ b/tests/templates/kuttl/db-init/40-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-python +timeout: 240 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/db-init/40-install-airflow-python.yaml b/tests/templates/kuttl/db-init/40-install-airflow-python.yaml new file mode 100644 index 00000000..c3f865a0 --- /dev/null +++ b/tests/templates/kuttl/db-init/40-install-airflow-python.yaml @@ -0,0 +1,23 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python + labels: + app: test-airflow-python +spec: + replicas: 1 + selector: + matchLabels: + app: test-airflow-python + template: + metadata: + labels: + app: test-airflow-python + spec: + containers: + - name: test-airflow-python + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + stdin: true + tty: true diff --git a/tests/templates/kuttl/db-init/50-assert.yaml.j2 b/tests/templates/kuttl/db-init/50-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/db-init/50-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/db-init/50-health-check.yaml b/tests/templates/kuttl/db-init/50-health-check.yaml new file mode 100644 index 00000000..5d3b329f --- /dev/null +++ b/tests/templates/kuttl/db-init/50-health-check.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 480 +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/health.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/db-init/70-assert.yaml.j2 b/tests/templates/kuttl/db-init/70-assert.yaml.j2 new file mode 100644 index 00000000..960fd21e --- /dev/null +++ b/tests/templates/kuttl/db-init/70-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-cluster +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-webserver-default +status: + readyReplicas: 1 + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +status: + readyReplicas: 2 + replicas: 2 +{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-scheduler-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/db-init/70-install-airflow-cluster-no-db-init.yaml.j2 b/tests/templates/kuttl/db-init/70-install-airflow-cluster-no-db-init.yaml.j2 new file mode 100644 index 00000000..9761b3b7 --- /dev/null +++ b/tests/templates/kuttl/db-init/70-install-airflow-cluster-no-db-init.yaml.j2 @@ -0,0 +1,13 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-airflow-db +timeout: 480 +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + dbInit: false diff --git a/tests/templates/kuttl/db-init/helm-bitnami-postgresql-values.yaml.j2 b/tests/templates/kuttl/db-init/helm-bitnami-postgresql-values.yaml.j2 new file mode 100644 index 00000000..12d7da16 --- /dev/null +++ b/tests/templates/kuttl/db-init/helm-bitnami-postgresql-values.yaml.j2 @@ -0,0 +1,24 @@ +--- +volumePermissions: + enabled: false + securityContext: + runAsUser: auto + +primary: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +shmVolume: + chmod: + enabled: false + +auth: + username: airflow + password: airflow + database: airflow diff --git a/tests/templates/kuttl/db-init/helm-bitnami-redis-values.yaml.j2 b/tests/templates/kuttl/db-init/helm-bitnami-redis-values.yaml.j2 new file mode 100644 index 00000000..ca93a983 --- /dev/null +++ b/tests/templates/kuttl/db-init/helm-bitnami-redis-values.yaml.j2 @@ -0,0 +1,29 @@ +--- +volumePermissions: + enabled: false + containerSecurityContext: + runAsUser: auto + +master: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +replica: + replicaCount: 1 + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +auth: + password: redis diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index d2860cd8..24a20dbf 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -90,6 +90,11 @@ tests: - airflow - openshift - executor + - name: db-init + dimensions: + - airflow-latest + - openshift + - executor suites: - name: nightly # Run nightly with the latest airflow