diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 244000c6ded..1cddc0b8814 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -628,7 +628,14 @@ def on_demand_feature_view( if not _sources: raise ValueError("The `sources` parameter must be specified.") + def mainify(obj): + # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same + # name as the original file defining the ODFV. + if obj.__module__ != "__main__": + obj.__module__ = "__main__" + def decorator(user_function): + mainify(user_function) on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=_sources, diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 261e501a876..0e82fdf47ad 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -110,6 +110,7 @@ def parse_repo(repo_root: Path) -> RepoContents: request_feature_views=[], ) + data_sources_set = set() for repo_file in get_repo_files(repo_root): module_path = py_path_to_module(repo_file) module = importlib.import_module(module_path) @@ -119,6 +120,7 @@ def parse_repo(repo_root: Path) -> RepoContents: (obj is ds) for ds in res.data_sources ): res.data_sources.append(obj) + data_sources_set.add(obj) if isinstance(obj, FeatureView) and not any( (obj is fv) for fv in res.feature_views ): @@ -126,7 +128,10 @@ def parse_repo(repo_root: Path) -> RepoContents: if isinstance(obj.stream_source, PushSource) and not any( (obj is ds) for ds in res.data_sources ): - res.data_sources.append(obj.stream_source.batch_source) + push_source_dep = obj.stream_source.batch_source + # Don't add if the push source's batch source is a duplicate of an existing batch source + if push_source_dep not in data_sources_set: + res.data_sources.append(push_source_dep) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ):