From 20d6c7fc0e292d2a7534e75ef326fed5c037d773 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Mon, 28 Aug 2023 16:46:15 -0500 Subject: [PATCH 1/2] SAASMLOPS-734 bytewax in-cluster config, custom labels, fix worker image deps Signed-off-by: James Crabtree --- .../bytewax/bytewax_materialization_engine.py | 20 +++++++++---------- setup.py | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index b222128bbbe..133634017b5 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -61,6 +61,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): include_security_context_capabilities: bool = True """ (optional) Include security context capabilities in the init and job container spec """ + labels: dict = {} + """ (optional) additional labels to append to kubernetes objects """ + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -82,7 +85,7 @@ def __init__( self.online_store = online_store # TODO: Configure k8s here - k8s_config.load_kube_config() + k8s_config.load_config() self.k8s_client = client.api_client.ApiClient() self.v1 = client.CoreV1Api(self.k8s_client) @@ -196,14 +199,13 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): {"paths": paths, "feature_view": feature_view.name} ) + labels = {"feast-bytewax-materializer": "configmap"} configmap_manifest = { "kind": "ConfigMap", "apiVersion": "v1", "metadata": { "name": f"feast-{job_id}", - "labels": { - "feast-bytewax-materializer": "configmap", - }, + "labels": {**labels, **self.batch_engine_config.labels}, }, "data": { "feature_store.yaml": feature_store_configuration, @@ -260,15 +262,15 @@ def _create_job_definition(self, job_id, namespace, pods, env): "drop": ["ALL"], } + job_labels = {"feast-bytewax-materializer": "job"} + pod_labels = {"feast-bytewax-materializer": "pod"} job_definition = { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "name": f"dataflow-{job_id}", "namespace": namespace, - "labels": { - "feast-bytewax-materializer": "job", - }, + "labels": {**job_labels, **self.batch_engine_config.labels}, }, "spec": { "ttlSecondsAfterFinished": 3600, @@ -278,9 +280,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): "template": { "metadata": { "annotations": self.batch_engine_config.annotations, - "labels": { - "feast-bytewax-materializer": "pod", - }, + "labels": {**pod_labels, **self.batch_engine_config.labels}, }, "spec": { "restartPolicy": "Never", diff --git a/setup.py b/setup.py index f7b1ff04175..047100f03eb 100644 --- a/setup.py +++ b/setup.py @@ -98,7 +98,7 @@ "hiredis>=2.0.0,<3", ] -AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2"] +AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "s3fs"] BYTEWAX_REQUIRED = ["bytewax==0.15.1", "docker>=5.0.2", "kubernetes<=20.13.0"] From 2a275d36907d883abb7b994bd306ffd54ba21f13 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Mon, 11 Sep 2023 16:08:39 -0500 Subject: [PATCH 2/2] SAASMLOPS-769 make max parallelism configurable Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 133634017b5..21b7a5da1fa 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -64,6 +64,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): labels: dict = {} """ (optional) additional labels to append to kubernetes objects """ + max_parallelism: int = 10 + """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -275,7 +278,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): "spec": { "ttlSecondsAfterFinished": 3600, "completions": pods, - "parallelism": pods, + "parallelism": min(pods, self.batch_engine_config.max_parallelism), "completionMode": "Indexed", "template": { "metadata": {