diff --git a/infra/charts/feast-spark/charts/feast-jobservice/templates/configmap.yaml b/infra/charts/feast-spark/charts/feast-jobservice/templates/configmap.yaml index 356358fd..99084a9f 100644 --- a/infra/charts/feast-spark/charts/feast-jobservice/templates/configmap.yaml +++ b/infra/charts/feast-spark/charts/feast-jobservice/templates/configmap.yaml @@ -13,4 +13,10 @@ metadata: data: jobTemplate.yaml: | {{- toYaml .Values.sparkOperator.jobTemplate | nindent 4 }} + batchJobTemplate.yaml: | +{{- toYaml .Values.sparkOperator.batchJobTemplate | nindent 4 }} + streamJobTemplate.yaml: | +{{- toYaml .Values.sparkOperator.streamJobTemplate | nindent 4 }} + historicalJobTemplate.yaml: | +{{- toYaml .Values.sparkOperator.historicalJobTemplate | nindent 4 }} {{- end }} \ No newline at end of file diff --git a/infra/charts/feast-spark/charts/feast-jobservice/templates/deployment.yaml b/infra/charts/feast-spark/charts/feast-jobservice/templates/deployment.yaml index c75a1617..8952bb93 100644 --- a/infra/charts/feast-spark/charts/feast-jobservice/templates/deployment.yaml +++ b/infra/charts/feast-spark/charts/feast-jobservice/templates/deployment.yaml @@ -76,6 +76,12 @@ spec: {{- if .Values.sparkOperator.enabled }} - name: FEAST_SPARK_K8S_JOB_TEMPLATE_PATH value: /etc/configs/jobTemplate.yaml + - name: SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH + value: /etc/configs/batchJobTemplate.yaml + - name: SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH + value: /etc/configs/streamJobTemplate.yaml + - name: SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH + value: /etc/configs/historicalJobTemplate.yaml {{- end }} {{- range $key, $value := .Values.envOverrides }} - name: {{ printf "%s" $key | replace "." "_" | upper | quote }} diff --git a/infra/charts/feast-spark/charts/feast-jobservice/values.yaml b/infra/charts/feast-spark/charts/feast-jobservice/values.yaml index c069211a..ac1b690d 100644 --- a/infra/charts/feast-spark/charts/feast-jobservice/values.yaml +++ b/infra/charts/feast-spark/charts/feast-jobservice/values.yaml @@ -26,6 +26,10 @@ sparkOperator: enabled: false # sparkOperator.jobTemplate -- Content of the job template, in yaml format jobTemplate: {} + # specialized job templates by job types + batchJobTemplate: {} + streamJobTemplate: {} + historicalJobTemplate: {} prometheus: # prometheus.enabled -- Flag to enable scraping of metrics diff --git a/python/feast_spark/constants.py b/python/feast_spark/constants.py index 8ea25ec7..7c556bce 100644 --- a/python/feast_spark/constants.py +++ b/python/feast_spark/constants.py @@ -93,6 +93,15 @@ class ConfigOptions(metaclass=ConfigMeta): # SparkApplication resource template SPARK_K8S_JOB_TEMPLATE_PATH = None + # SparkApplication resource template for Batch Ingestion Jobs + SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH: Optional[str] = "" + + # SparkApplication resource template for Stream Ingestion Jobs + SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH: Optional[str] = "" + + # SparkApplication resource template for Historical Retrieval Jobs + SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = "" + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" diff --git a/python/feast_spark/pyspark/launcher.py b/python/feast_spark/pyspark/launcher.py index 9f5e95ac..98483a0f 100644 --- a/python/feast_spark/pyspark/launcher.py +++ b/python/feast_spark/pyspark/launcher.py @@ -73,7 +73,16 @@ def _k8s_launcher(config: Config) -> JobLauncher: return k8s.KubernetesJobLauncher( namespace=config.get(opt.SPARK_K8S_NAMESPACE), - resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH, None), + generic_resource_template_path=config.get(opt.SPARK_K8S_JOB_TEMPLATE_PATH), + batch_ingestion_resource_template_path=config.get( + opt.SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH, None + ), + stream_ingestion_resource_template_path=config.get( + opt.SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH, None + ), + historical_retrieval_resource_template_path=config.get( + opt.SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH, None + ), staging_location=staging_location, incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG), staging_client=get_staging_client(staging_uri.scheme, config), diff --git a/python/feast_spark/pyspark/launchers/k8s/k8s.py b/python/feast_spark/pyspark/launchers/k8s/k8s.py index 14a136f7..e462cafa 100644 --- a/python/feast_spark/pyspark/launchers/k8s/k8s.py +++ b/python/feast_spark/pyspark/launchers/k8s/k8s.py @@ -51,7 +51,10 @@ ) -def _load_resource_template(job_template_path: Path) -> Dict[str, Any]: +def _load_resource_template(job_template_path: Optional[str]) -> Dict[str, Any]: + if not job_template_path or not Path(job_template_path).exists(): + return {} + with open(job_template_path, "rt") as f: return yaml.safe_load(f) @@ -189,7 +192,10 @@ def __init__( namespace: str, incluster: bool, staging_location: str, - resource_template_path: Optional[Path], + generic_resource_template_path: Optional[str], + batch_ingestion_resource_template_path: Optional[str], + stream_ingestion_resource_template_path: Optional[str], + historical_retrieval_resource_template_path: Optional[str], staging_client: AbstractStagingClient, azure_account_name: str, azure_account_key: str, @@ -200,10 +206,26 @@ def __init__( self._staging_client = staging_client self._azure_account_name = azure_account_name self._azure_account_key = azure_account_key - if resource_template_path is not None: - self._resource_template = _load_resource_template(resource_template_path) - else: - self._resource_template = yaml.safe_load(DEFAULT_JOB_TEMPLATE) + + generic_template = _load_resource_template( + generic_resource_template_path + ) or yaml.safe_load(DEFAULT_JOB_TEMPLATE) + + self._batch_ingestion_template = ( + _load_resource_template(batch_ingestion_resource_template_path) + or generic_template + ) + + self._stream_ingestion_template = ( + _load_resource_template(stream_ingestion_resource_template_path) + or generic_template + ) + + self._historical_retrieval_template = ( + _load_resource_template(historical_retrieval_resource_template_path) + or generic_template + ) + self._scheduled_resource_template = yaml.safe_load( DEFAULT_SCHEDULED_JOB_TEMPLATE ) @@ -281,7 +303,7 @@ def historical_feature_retrieval( job_id = _generate_job_id() resource = _prepare_job_resource( - job_template=self._resource_template, + job_template=self._historical_retrieval_template, job_id=job_id, job_type=HISTORICAL_RETRIEVAL_JOB_TYPE, main_application_file=pyspark_script_path, @@ -341,7 +363,7 @@ def offline_to_online_ingestion( job_id = _generate_job_id() resource = _prepare_job_resource( - job_template=self._resource_template, + job_template=self._batch_ingestion_template, job_id=job_id, job_type=OFFLINE_TO_ONLINE_JOB_TYPE, main_application_file=jar_s3_path, @@ -394,7 +416,7 @@ def schedule_offline_to_online_ingestion( scheduled_job_template=self._scheduled_resource_template, scheduled_job_id=schedule_job_id, job_schedule=ingestion_job_params.get_job_schedule(), - job_template=self._resource_template, + job_template=self._batch_ingestion_template, job_type=OFFLINE_TO_ONLINE_JOB_TYPE, main_application_file=jar_s3_path, main_class=ingestion_job_params.get_class_name(), @@ -454,7 +476,7 @@ def start_stream_to_online_ingestion( job_id = _generate_job_id() resource = _prepare_job_resource( - job_template=self._resource_template, + job_template=self._stream_ingestion_template, job_id=job_id, job_type=STREAM_TO_ONLINE_JOB_TYPE, main_application_file=jar_s3_path,