diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index 36b7184be..e04e95cd0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -1,7 +1,7 @@ package com.linkedin.feathr.offline.client import com.linkedin.feathr.common.exception._ -import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName} +import com.linkedin.feathr.common.{FeatureInfo, FeatureTypeConfig, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName} import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig} import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator} @@ -12,8 +12,10 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.swa.SWAHandler +import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.util._ import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} @@ -310,6 +312,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldAddDefault = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + var leftRenamed = left val featureToPathsMap = (for { requiredFeature <- logicalPlan.allRequiredFeatures featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) @@ -323,6 +327,21 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() .getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) + } else if (shouldAddDefault) { + val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() + .excludeMissingAnchoredFeatures(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) + val missingAnchoredFeatures = updatedFeatureGroups.allAnchoredFeatures.filterNot { case (k, _) => + featureGroupsWithoutInvalidFeatures.allAnchoredFeatures.contains(k) } + val defaults = missingAnchoredFeatures.flatMap(s => s._2.featureAnchor.defaults) + val featureTypes = missingAnchoredFeatures + .map(x => Some(x._2.featureAnchor.featureTypeConfigs)) + .foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig])) + val updatedLeft = missingAnchoredFeatures.keys.foldLeft(left) { (left, featureName) => + left.withColumn(featureName, lit(null)) + } + leftRenamed = + substituteDefaults(updatedLeft, missingAnchoredFeatures.keys.toSeq, defaults, featureTypes, sparkSession) + logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) } else { throw new FeathrInputDataException( ErrorLabel.FEATHR_USER_ERROR, @@ -358,7 +377,6 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val renameFeatures = conflictsAutoCorrectionSetting.get.renameFeatureList val suffix = conflictsAutoCorrectionSetting.get.suffix log.warn(s"Found conflicted field names: ${conflictFeatureNames}. Will auto correct them by applying suffix: ${suffix}") - var leftRenamed = left conflictFeatureNames.foreach(name => { leftRenamed = leftRenamed.withColumnRenamed(name, name+'_'+suffix) }) @@ -369,7 +387,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: s"Failed to apply auto correction to solve conflicts. Still got conflicts after applying provided suffix ${suffix} for fields: " + s"${conflictFeatureNames}. Please provide another suffix or solve conflicts manually.") } - val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, rowBloomFilterThreshold) + val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, + rowBloomFilterThreshold) if(renameFeatures) { log.warn(s"Suffix :${suffix} is applied into feature names: ${conflictFeatureNames} to avoid conflicts in outputs") renameFeatureNames(df, header, conflictFeatureNames, suffix) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index 2da54373e..40e49387a 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -110,6 +110,33 @@ private[offline] class FeatureGroupsUpdater { (updatedFeatureGroups, updatedKeyTaggedFeatures) } + /** + * Update the feature groups (for Feature join) based on feature missing features. Few anchored/SWA features can be missing if the feature data + * is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it. + * + * @param featureGroups Original feature groups + * @param allAnchoredFeaturesWithData all anchored features which were not skipped because of missing data + * @param skippedSwaFeatures skipped SWA features due to missing data issue. + * @param keyTaggedFeatures List of all key tagged features + * @return Updated feature groups and updated list of joining features + */ + def removeMissingAnchoredFeatures(featureGroups: FeatureGroups, allAnchoredFeaturesWithData: Seq[String], skippedSwaFeatures: Seq[String], + keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { + // Filter out all the window agg features that were skipped. + val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => !skippedSwaFeatures.contains(windowAggFeature._1)) + // We need to add the window agg features to it as they are also considered anchored features. + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => + allAnchoredFeaturesWithData.contains(featureRow._1)) ++ updatedWindowAggFeatures ++ featureGroups.allPassthroughFeatures + + log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)},") + val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, featureGroups.allDerivedFeatures, updatedWindowAggFeatures, + featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) + || featureGroups.allDerivedFeatures.contains(feature.featureName) || updatedWindowAggFeatures.contains(feature.featureName) + || featureGroups.allPassthroughFeatures.contains(feature.featureName) || featureGroups.allSeqJoinFeatures.contains(feature.featureName)) + (updatedFeatureGroups, updatedKeyTaggedFeatures) + } + /** * Update the feature groups (for Feature join) based on feature missing features. Few anchored/SWA features can be missing if the feature data * is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it. @@ -194,6 +221,25 @@ private[offline] class FeatureGroupsUpdater { featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, updatedSeqJoinFeatures) } + /** + * Exclude anchored missing anchored features from the feature groups. This API does not remove the features which are dependent + * on this feature. + * + * @param featureToPathsMap Map of anchored feature names to their paths + * @param featureGroups All feature groups + * @param invalidPaths List of all invalid paths + * @return + */ + def excludeMissingAnchoredFeatures(featureToPathsMap: Map[String, String], featureGroups: FeatureGroups, invalidPaths: Seq[String]): FeatureGroups = { + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureNameToAnchoredObject => { + !invalidPaths.contains(featureToPathsMap(featureNameToAnchoredObject._1)) + }) + + log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)},") + FeatureGroups(updatedAnchoredFeatures, featureGroups.allDerivedFeatures, + featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + } + } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index 11d657c47..29f2e018c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -23,6 +23,7 @@ import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.util.datetime.DateTimeInterval import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame} import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.sketch.BloomFilter @@ -196,6 +197,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d .map(featureGroups.allAnchoredFeatures), failOnMissingPartition) val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) @@ -203,6 +205,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d val (FeatureDataFrame(withWindowAggFeatureDF, inferredSWAFeatureTypes), skippedFeatures) = joinSWAFeatures(ss, obsToJoinWithFeatures, joinConfig, featureGroups, failOnMissingPartition, bloomFilters, swaObsTime, swaHandler) + var updatedObs = observationDF // Update the feature groups based on the missing features. Certain SWA features can be skipped because of missing data issue, we need to skip // the corresponding derived, seq join features which could depend on this SWA feature. val (updatedFeatureGroups, updatedLogicalPlan) = if (shouldSkipFeature) { @@ -211,7 +214,24 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d val newLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(newFeatureGroups, newKeyTaggedFeatures) (newFeatureGroups, newLogicalPlan) - } else (featureGroups, logicalPlan) + } /*else if (shouldAddDefaultCol) { + val (newFeatureGroups, newKeyTaggedFeatures) = FeatureGroupsUpdater().removeMissingAnchoredFeatures(featureGroups, + updatedSourceAccessorMap.keySet.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.features).toSeq, skippedFeatures, keyTaggedFeatures) + val missingAnchoredFeatures = featureGroups.allAnchoredFeatures.filterNot { case (k, _) => + featureGroups.allAnchoredFeatures.contains(k) + } + val defaults = missingAnchoredFeatures.flatMap(s => s._2.featureAnchor.defaults) + val featureTypes = missingAnchoredFeatures + .map(x => Some(x._2.featureAnchor.featureTypeConfigs)) + .foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig])) + val updatedObsDf = missingAnchoredFeatures.keys.foldLeft(observationDF) { (observationDF, featureName) => + observationDF.withColumn(featureName, lit(null)) + } + updatedObs = + substituteDefaults(updatedObsDf, missingAnchoredFeatures.keys.toSeq, defaults, featureTypes, ss) + val newLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(featureGroups, keyTaggedFeatures) + (newFeatureGroups, newLogicalPlan) + }*/ else (featureGroups, logicalPlan) implicit val joinExecutionContext: JoinExecutionContext = JoinExecutionContext(ss, updatedLogicalPlan, updatedFeatureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs)) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index 578de839d..d98782a72 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -1,7 +1,7 @@ package com.linkedin.feathr.offline.join.workflow import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrFeatureJoinException} -import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig} +import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig, FeatureTypes} import com.linkedin.feathr.offline import com.linkedin.feathr.offline.FeatureDataFrame import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource @@ -12,15 +12,16 @@ import com.linkedin.feathr.offline.job.KeyedTransformedResult import com.linkedin.feathr.offline.join._ import com.linkedin.feathr.offline.join.algorithms._ import com.linkedin.feathr.offline.join.util.FrequentItemEstimatorFactory +import com.linkedin.feathr.offline.logical.{LogicalPlan, MultiStageJoinPlan} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.DataFrameExt._ -import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils} +import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils} import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint import org.apache.logging.log4j.LogManager import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{col, lit} /** * An abstract class provides default implementation of anchored feature join step @@ -39,8 +40,62 @@ private[offline] class AnchoredFeatureJoinStep( extends FeatureJoinStep[AnchorJoinStepInput, DataFrameJoinStepOutput] { @transient lazy val log = LogManager.getLogger(getClass.getName) + /** + * When the add.default.col.for.missing.data flag is turned, some features could be skipped because of missing data. + * For such anchored features, we will add a feature column with a configured default value (if present in the feature anchor) or + * a null value column. + * @param sparkSession spark session + * @param dataframe the original observation dataframe + * @param logicalPlan logical plan generated using the join config + * @param missingFeatures Map of missing feature names to the corresponding featureAnchorWithSource object. + * @return Dataframe with the missing feature columns added + */ + def substituteDefaultsForDataMissingFeatures(sparkSession: SparkSession, dataframe: DataFrame, logicalPlan: MultiStageJoinPlan, + missingFeatures: Map[String, FeatureAnchorWithSource]): DataFrame = { + // Create a map of feature name to corresponding defaults + val defaults = missingFeatures.flatMap(s => s._2.featureAnchor.defaults) + + // Create a map of feature to their feature type if configured. + val featureTypes = missingFeatures + .map(x => Some(x._2.featureAnchor.featureTypeConfigs)) + .foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig])) + + // We try to guess the column data type from the configured feature type. If feature type is not present, we will default to + // default feathr behavior of returning a map column of string to float. + val updatedDf = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) => + val featureColumnType = if (featureTypes.contains(featureName)) { + featureTypes(featureName).getFeatureType match { + case FeatureTypes.NUMERIC => "float" + case FeatureTypes.BOOLEAN => "boolean" + case FeatureTypes.DENSE_VECTOR => "array" + case FeatureTypes.CATEGORICAL => "string" + case FeatureTypes.CATEGORICAL_SET => "array" + case FeatureTypes.TERM_VECTOR => "map" + case FeatureTypes.UNSPECIFIED => "map" + case _ => "map" + } + } else { // feature type is not configured + "map" + } + observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType)) + } + + val dataframeWithDefaults = substituteDefaults(updatedDf, missingFeatures.keys.toSeq, defaults, featureTypes, + sparkSession, (s: String) => s"${FEATURE_NAME_PREFIX}$s") + + // We want to duplicate this column with the correct feathr supported feature name which is required for further processing. + // For example, if feature name is abc and the corresponding key is x, the column name would be __feathr_feature_abc_x. + missingFeatures.keys.foldLeft(dataframeWithDefaults) { (dataframeWithDefaults, featureName) => + val keyTags = logicalPlan.joinStages.filter(kv => kv._2.contains(featureName)).head._1 + val keyStr = keyTags.map(logicalPlan.keyTagIntsToStrings).toList + dataframeWithDefaults.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName, Some(keyStr)), + col(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName))) + } + } + /** * Join anchored features to the observation passed as part of the input context. + * * @param features Non-window aggregation, basic anchored features. * @param input input context for this step. * @param ctx environment variable that contains join job execution context. @@ -49,10 +104,47 @@ private[offline] class AnchoredFeatureJoinStep( override def joinFeatures(features: Seq[ErasedEntityTaggedFeature], input: AnchorJoinStepInput)( implicit ctx: JoinExecutionContext): FeatureDataFrameOutput = { val AnchorJoinStepInput(observationDF, anchorDFMap) = input + var updatedObs = observationDF + val withMissingFeaturesSubstituted = substituteDefaultsForDataMissingFeatures() + val missingFeatures = features.map(x => x.getFeatureName).filter(x => { + val containsFeature :Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq + containsFeature.contains(false) + }) + val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) + val defaults = missingAnchoredFeatures.flatMap(s => s._2.featureAnchor.defaults) + val featureTypes = missingAnchoredFeatures + .map(x => Some(x._2.featureAnchor.featureTypeConfigs)) + .foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig])) + val updatedObsDf = missingAnchoredFeatures.keys.foldLeft(observationDF) { (observationDF, featureName) => + val featureColumnType = if (featureTypes.contains(featureName)) { + featureTypes(featureName).getFeatureType match { + case FeatureTypes.NUMERIC => "float" + case FeatureTypes.BOOLEAN => "boolean" + case FeatureTypes.DENSE_VECTOR => "array" + case FeatureTypes.CATEGORICAL => "string" + case FeatureTypes.CATEGORICAL_SET => "array" + case FeatureTypes.TERM_VECTOR => "map" + case FeatureTypes.UNSPECIFIED => "map" + case _ => "map" + } + } else { + "map" + } + observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType)) + } + updatedObs = substituteDefaults(updatedObsDf, missingAnchoredFeatures.keys.toSeq, defaults, featureTypes, ctx.sparkSession, (s: String) => s"${FEATURE_NAME_PREFIX}$s") + updatedObs = missingAnchoredFeatures.keys.foldLeft(updatedObs) { (updatedObs, featureName) => + val keyTags = ctx.logicalPlan.joinStages.filter(kv => kv._2.contains(featureName)).head._1 + val keyStr = keyTags.map(ctx.logicalPlan.keyTagIntsToStrings).toList + updatedObs.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName, Some(keyStr)), + col(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName))) + } + missingAnchoredFeatures.map(x => x._2.featureAnchor.sourceKeyExtractor.getKeyColumnNames()) + val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures val joinStages = ctx.logicalPlan.joinStages val joinOutput = joinStages - .foldLeft(FeatureDataFrame(observationDF, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => { + .foldLeft(FeatureDataFrame(updatedObs, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => { val (keyTags: Seq[Int], featureNames: Seq[String]) = joinStage val FeatureDataFrame(contextDF, inferredFeatureTypeMap) = accFeatureDataFrame // map feature name to its transformed dataframe and the join key of the dataframe diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 5aeeeac0a..bae2c64ef 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -37,6 +37,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH requiredFeatureAnchors: Seq[FeatureAnchorWithSource], failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors .map(anchor => (anchor.source, anchor)) @@ -74,7 +76,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH // If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the // feature should be skipped. So, we will try to take the first row here, and see if it succeeds. - if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && shouldSkipFeature) { + if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || shouldAddDefaultCol)) { if (dataSource.get().take(1).isEmpty) None else { Some(dataSource) } @@ -82,7 +84,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH Some(dataSource) } } catch { - case e: Exception => if (shouldSkipFeature) None else throw e + case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 15509171b..7999d8d28 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -7,11 +7,11 @@ import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader} import com.linkedin.feathr.offline.util.FeathrTestUtils -import com.linkedin.feathr.offline.util.FeathrUtils.{SKIP_MISSING_FEATURE, setFeathrJobParam} +import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -import org.testng.Assert.assertTrue +import org.testng.Assert.{assertEquals, assertTrue} import org.testng.annotations.{BeforeClass, Test} import scala.collection.mutable @@ -386,6 +386,146 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { setFeathrJobParam(SKIP_MISSING_FEATURE, "false") } + /* + * Test skipping combination of anchored, derived and swa features. Also, test it with different default value types. + */ + @Test + def testAddDefaultForMissingAnchoredFeatures: Unit = { + setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "true") + val df = runLocalFeatureJoinForTest( + joinConfigAsString = + """ + |settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "timestamp" + | format: "yyyy-MM-dd" + | } + | simulateTimeDelay: 1d + | } + |} + | + | features: { + | key: a_id + | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4", + | "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", + | "aEmbedding", "memberEmbeddingAutoTZ"] + | } + """.stripMargin, + featureDefAsString = + """ + | sources: { + | swaSource: { + | location: { path: "generaion/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + | swaSource1: { + | location: { path: "generation/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + | anchors: { + | anchor1: { + | source: "anchorAndDerivations/nullVaueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: NUMERIC + | default: -1 + | } + | featureWithNull3: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: CATEGORICAL + | default: "null" + | } + | featureWithNull4: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: TERM_VECTOR + | default: {} + | } + | featureWithNull6: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: DENSE_VECTOR + | default: [1, 2, 3] + | } + | featureWithNull5: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | default: 1 + | } + | } + | } + | + | anchor2: { + | source: "anchorAndDerivations/nullValueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull2: "isPresent(value) ? toNumeric(value) : 0" + | } + | } + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | aEmbedding: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | default: 2 + | } + | } + | } + | swaAnchor1: { + | source: "swaSource1" + | key: "x" + | features: { + | memberEmbeddingAutoTZ: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | type: { + | type: TENSOR + | tensorCategory: SPARSE + | dimensionType: [INT] + | valType: FLOAT + | } + | } + | } + | } + |} + |derivations: { + | + | derived_featureWithNull: "featureWithNull * 2" + | derived_featureWithNull2: "featureWithNull2 * 2" + |} + """.stripMargin, + observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv") + + df.data.show() + val featureList = df.data.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("a_id") else "null") + assertEquals(featureList(0).getAs[Row]("aEmbedding"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) + assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null") + assertEquals(featureList(0).getAs[Row]("featureWithNull5"), mutable.Map("" -> 1.0f)) + assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f) + assertEquals(featureList(0).getAs[Row]("featureWithNull4"),Map()) + assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f) + assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f)))) + assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) + setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") + } + /* * Test features with fdsExtract. */ diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index f7ef08739..d67139f48 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -28,133 +28,133 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { def testLocalAnchorSWATest: Unit = { val df = runLocalFeatureJoinForTest( joinConfigAsString = """ - | settings: { - | observationDataTimeSettings: { - | absoluteTimeRange: { - | startTime: "2018-05-01" - | endTime: "2018-05-03" - | timeFormat: "yyyy-MM-dd" - | } - | } - | joinTimeSettings: { - | timestampColumn: { - | def: timestamp - | format: "yyyy-MM-dd" - | } - | } - |} - | - |features: [ - | { - | key: [x], - | featureList: ["f1", "f1Sum", "f2", "f1f1"] - | }, - | { - | key: [x, y] - | featureList: ["f3", "f4"] - | } - |] + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | startTime: "2018-05-01" + | endTime: "2018-05-03" + | timeFormat: "yyyy-MM-dd" + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: "yyyy-MM-dd" + | } + | } + |} + | + |features: [ + | { + | key: [x], + | featureList: ["f1", "f1Sum", "f2", "f1f1"] + | }, + | { + | key: [x, y] + | featureList: ["f3", "f4"] + | } + |] """.stripMargin, featureDefAsString = """ - |sources: { - | ptSource: { - | type: "PASSTHROUGH" - | } - | swaSource: { - | location: { path: "slidingWindowAgg/localSWAAnchorTestFeatureData/daily" } - | timePartitionPattern: "yyyy/MM/dd" - | timeWindowParameters: { - | timestampColumn: "timestamp" - | timestampColumnFormat: "yyyy-MM-dd" - | } - | } - |} - | - |anchors: { - | ptAnchor: { - | source: "ptSource" - | key: "x" - | features: { - | f1f1: { - | def: "([$.term:$.value] in passthroughFeatures if $.name == 'f1f1')" - | } - | } - | } - | swaAnchor: { - | source: "swaSource" - | key: "substring(x, 0)" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } - | features: { - | f1: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f1'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d - | } - | } - | } - | - | swaAnchor2: { - | source: "swaSource" - | key: "x" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } - | features: { - | f1Sum: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f1'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d - | } - | } - | } - | swaAnchorWithKeyExtractor: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" - | features: { - | f3: { - | def: "aggregationWindow" - | aggregation: SUM - | window: 3d - | } - | } - | } - | swaAnchorWithKeyExtractor2: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" - | features: { - | f4: { - | def: "aggregationWindow" - | aggregation: SUM - | window: 3d - | } - | } - | } - | swaAnchorWithKeyExtractor3: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor2" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } - | features: { - | f2: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f2'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d - | } - | } - | } - |} + |sources: { + | ptSource: { + | type: "PASSTHROUGH" + | } + | swaSource: { + | location: { path: "slidingWindowAgg/localSWAAnchorTestFeatureData/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | ptAnchor: { + | source: "ptSource" + | key: "x" + | features: { + | f1f1: { + | def: "([$.term:$.value] in passthroughFeatures if $.name == 'f1f1')" + | } + | } + | } + | swaAnchor: { + | source: "swaSource" + | key: "substring(x, 0)" + | lateralViewParameters: { + | lateralViewDef: explode(features) + | lateralViewItemAlias: feature + | } + | features: { + | f1: { + | def: "feature.col.value" + | filter: "feature.col.name = 'f1'" + | aggregation: SUM + | groupBy: "feature.col.term" + | window: 3d + | } + | } + | } + | + | swaAnchor2: { + | source: "swaSource" + | key: "x" + | lateralViewParameters: { + | lateralViewDef: explode(features) + | lateralViewItemAlias: feature + | } + | features: { + | f1Sum: { + | def: "feature.col.value" + | filter: "feature.col.name = 'f1'" + | aggregation: SUM + | groupBy: "feature.col.term" + | window: 3d + | } + | } + | } + | swaAnchorWithKeyExtractor: { + | source: "swaSource" + | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" + | features: { + | f3: { + | def: "aggregationWindow" + | aggregation: SUM + | window: 3d + | } + | } + | } + | swaAnchorWithKeyExtractor2: { + | source: "swaSource" + | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" + | features: { + | f4: { + | def: "aggregationWindow" + | aggregation: SUM + | window: 3d + | } + | } + | } + | swaAnchorWithKeyExtractor3: { + | source: "swaSource" + | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor2" + | lateralViewParameters: { + | lateralViewDef: explode(features) + | lateralViewItemAlias: feature + | } + | features: { + | f2: { + | def: "feature.col.value" + | filter: "feature.col.name = 'f2'" + | aggregation: SUM + | groupBy: "feature.col.term" + | window: 3d + | } + | } + | } + |} """.stripMargin, "slidingWindowAgg/localAnchorTestObsData.avro.json").data df.show() @@ -759,56 +759,56 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { def testLocalAnchorSWASimulateTimeDelay(delay: String): Unit = { val res = runLocalFeatureJoinForTest( joinConfigAsString = s""" - | settings: { - | observationDataTimeSettings: { - | absoluteTimeRange: { - | timeFormat: yyyy-MM-dd - | startTime: "2018-05-01" - | endTime: "2018-05-03" - | } - | } - | joinTimeSettings: { - | timestampColumn: { - | def: timestamp - | format: yyyy-MM-dd - | } - | simulateTimeDelay: ${delay} - | } - |} - | - |features: [ - | - | { - | key: [x], - | featureList: ["simpleFeature"] - | } - |] + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | timeFormat: yyyy-MM-dd + | startTime: "2018-05-01" + | endTime: "2018-05-03" + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: yyyy-MM-dd + | } + | simulateTimeDelay: ${delay} + | } + |} + | + |features: [ + | + | { + | key: [x], + | featureList: ["simpleFeature"] + | } + |] """.stripMargin, featureDefAsString = """ - |sources: { - | swaSource: { - | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } - | timePartitionPattern: "yyyy/MM/dd" - | timeWindowParameters: { - | timestampColumn: "timestamp" - | timestampColumnFormat: "yyyy-MM-dd" - | } - | } - |} - | - |anchors: { - | swaAnchor: { - | source: "swaSource" - | key: "x" - | features: { - | simpleFeature: { - | def: "aggregationWindow" - | aggregation: COUNT - | window: 3d - | } - | } - | } - |} + |sources: { + | swaSource: { + | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | simpleFeature: { + | def: "aggregationWindow" + | aggregation: COUNT + | window: 3d + | } + | } + | } + |} """.stripMargin, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data @@ -823,77 +823,77 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { def testLocalAnchorSWAWithMultipleSettingsConfigTest: Unit = { val res = runLocalFeatureJoinForTest( joinConfigAsString = s""" - | settings: { - | observationDataTimeSettings: { - | absoluteTimeRange: { - | timeFormat: yyyy-MM-dd - | startTime: 2018-05-01 - | endTime: 2018-05-03 - | } - | } - | joinTimeSettings: { - | timestampColumn: { - | def: timestamp - | format: yyyy-MM-dd - | } - | simulateTimeDelay: 7d - | } - |} - | - |features: [ - | - | { - | key: [x], - | featureList: ["simpleFeature", "likesFeature"] - | }, - | { - | key: [x], - | featureList: ["commentsFeature"] - | overrideTimeDelay: 8d - | } - |] + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | timeFormat: yyyy-MM-dd + | startTime: 2018-05-01 + | endTime: 2018-05-03 + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: yyyy-MM-dd + | } + | simulateTimeDelay: 7d + | } + |} + | + |features: [ + | + | { + | key: [x], + | featureList: ["simpleFeature", "likesFeature"] + | }, + | { + | key: [x], + | featureList: ["commentsFeature"] + | overrideTimeDelay: 8d + | } + |] """.stripMargin, featureDefAsString = """ - |sources: { - | swaSource: { - | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } - | timePartitionPattern: "yyyy/MM/dd" - | timeWindowParameters: { - | timestampColumn: "timestamp" - | timestampColumnFormat: "yyyy-MM-dd" - | } - | } - |} - | - |anchors: { - | swaAnchor: { - | source: "swaSource" - | key: "x" - | features: { - | simpleFeature: { - | def: "aggregationWindow" - | aggregation: COUNT - | window: 3d - | } - | likesFeature: { - | def: "foo" - | aggregation: COUNT - | window: 3d - | } - | } - | } - | swaAnchor2: { - | source: "swaSource" - | key: "x" - | features: { - | commentsFeature: { - | def: "bar" - | aggregation: COUNT - | window: 3d - | } - | } - | } - |} + |sources: { + | swaSource: { + | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | simpleFeature: { + | def: "aggregationWindow" + | aggregation: COUNT + | window: 3d + | } + | likesFeature: { + | def: "foo" + | aggregation: COUNT + | window: 3d + | } + | } + | } + | swaAnchor2: { + | source: "swaSource" + | key: "x" + | features: { + | commentsFeature: { + | def: "bar" + | aggregation: COUNT + | window: 3d + | } + | } + | } + |} """.stripMargin, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data @@ -906,8 +906,8 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { } /** - * The test verifies that AFG works with hourly data. - */ + * The test verifies that AFG works with hourly data. + */ @Test def testAFGOutputWithHourlyData(): Unit = { val res = runLocalFeatureJoinForTest( @@ -1132,76 +1132,76 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { def testInvalidCaseWithOverrideTimeDelay: Unit = { val res = runLocalFeatureJoinForTest( joinConfigAsString = s""" - | settings: { - | observationDataTimeSettings: { - | absoluteTimeRange: { - | timeFormat: yyyy-MM-dd - | startTime: 2018-05-01 - | endTime: 2018-05-03 - | } - | } - | joinTimeSettings: { - | timestampColumn: { - | def: timestamp - | format: yyyy-MM-dd - | } - | } - |} - | - |features: [ - | - | { - | key: [x], - | featureList: ["simpleFeature", "likesFeature"] - | }, - | { - | key: [x], - | featureList: ["commentsFeature"] - | overrideTimeDelay: 8d - | } - |] + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | timeFormat: yyyy-MM-dd + | startTime: 2018-05-01 + | endTime: 2018-05-03 + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: yyyy-MM-dd + | } + | } + |} + | + |features: [ + | + | { + | key: [x], + | featureList: ["simpleFeature", "likesFeature"] + | }, + | { + | key: [x], + | featureList: ["commentsFeature"] + | overrideTimeDelay: 8d + | } + |] """.stripMargin, featureDefAsString = """ - |sources: { - | swaSource: { - | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } - | isTimeSeries: true - | timeWindowParameters: { - | timestamp: "timestamp" - | timestamp_format: "yyyy-MM-dd" - | } - | } - |} - | - |anchors: { - | swaAnchor: { - | source: "swaSource" - | key: "x" - | features: { - | simpleFeature: { - | def: "aggregationWindow" - | aggregation: COUNT - | window: 3d - | } - | likesFeature: { - | def: "foo" - | aggregation: COUNT - | window: 3d - | } - | } - | } - | swaAnchor2: { - | source: "swaSource" - | key: "x" - | features: { - | commentsFeature: { - | def: "bar" - | aggregation: COUNT - | window: 3d - | } - | } - | } - |} + |sources: { + | swaSource: { + | location: { path: "slidingWindowAgg/localSWASimulateTimeDelay/daily" } + | isTimeSeries: true + | timeWindowParameters: { + | timestamp: "timestamp" + | timestamp_format: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | simpleFeature: { + | def: "aggregationWindow" + | aggregation: COUNT + | window: 3d + | } + | likesFeature: { + | def: "foo" + | aggregation: COUNT + | window: 3d + | } + | } + | } + | swaAnchor2: { + | source: "swaSource" + | key: "x" + | features: { + | commentsFeature: { + | def: "bar" + | aggregation: COUNT + | window: 3d + | } + | } + | } + |} """.stripMargin, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data