From 034149df4767548cd988c527327fec8dbe092daf Mon Sep 17 00:00:00 2001 From: baskaran eswaran Date: Fri, 28 Dec 2018 00:54:02 +0800 Subject: [PATCH 1/2] KafkaIo implementation for feast along with other refactors --- ingestion/pom.xml | 174 ++++++++++++++++-- .../main/java/feast/ingestion/ImportJob.java | 17 +- .../deserializer/FeatureRowDeserializer.java | 33 ++++ .../FeatureRowKeyDeserializer.java | 33 ++++ .../ingestion/options/ImportJobOptions.java | 6 +- .../ingestion/transform/FeatureEnums.java | 15 ++ .../transform/FeatureRowKafkaIO.java | 71 +++++++ .../transform/ReadFeaturesTransform.java | 65 ++++--- .../KafkaFeatureRowDeserializerTest.java | 127 +++++++++++++ protos/feast/types/FeatureRow.proto | 6 + 10 files changed, 498 insertions(+), 49 deletions(-) create mode 100644 ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java create mode 100644 ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/FeatureEnums.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java create mode 100644 ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java diff --git a/ingestion/pom.xml b/ingestion/pom.xml index 590c80bf159..dcad0707456 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -34,6 +34,7 @@ 1.35.0 1.2.0 4.1.0 + 2.2.2.RELEASE @@ -66,6 +67,125 @@ + + + direct-runner + + + org.apache.beam + beam-runners-direct-java + ${org.apache.beam.version} + runtime + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + ${project.artifactId}-direct + + + + feast.ingestion.ImportJob + + + + + + + + + + + flink-runner + + + org.apache.beam + beam-runners-flink_2.11 + ${org.apache.beam.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + ${project.artifactId}-flink + + + + + feast.ingestion.ImportJob + + + reference.conf + + + + + + + + + + + dataflow-runner + + true + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${org.apache.beam.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + ${project.artifactId}-dataflow + + + + feast.ingestion.ImportJob + + + + + + + + + + + org.hibernate.validator @@ -214,20 +334,19 @@ org.apache.beam - beam-sdks-java-io-jdbc + beam-runners-google-cloud-dataflow-java ${org.apache.beam.version} - org.apache.beam - beam-runners-direct-java + beam-sdks-java-io-jdbc ${org.apache.beam.version} org.apache.beam - beam-runners-google-cloud-dataflow-java + beam-sdks-java-io-kafka ${org.apache.beam.version} @@ -306,12 +425,6 @@ 42.2.5 - - org.apache.beam - beam-runners-flink_2.11 - ${org.apache.beam.version} - - com.github.kstyrc embedded-redis @@ -325,6 +438,45 @@ 1.9.1 test - + + com.google.guava + guava + 26.0-jre + compile + + + org.apache.kafka + kafka-clients + 2.0.0 + + + org.springframework.boot + spring-boot-starter-test + 2.1.1.RELEASE + test + + + org.springframework.kafka + spring-kafka + ${spring.kafka.version} + test + + + org.springframework.kafka + spring-kafka-test + ${spring.kafka.version} + test + + + org.apache.beam + beam-runners-flink_2.11 + ${org.apache.beam.version} + + + org.apache.beam + beam-runners-direct-java + ${org.apache.beam.version} + runtime + diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 47455e65371..67e5cb76007 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -18,6 +18,8 @@ package feast.ingestion; import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.dataflow.DataflowScopes; +import com.google.auth.oauth2.GoogleCredentials; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; @@ -39,7 +41,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; @@ -60,6 +61,7 @@ import org.joda.time.Duration; import org.slf4j.event.Level; +import java.io.IOException; import java.util.Arrays; import java.util.Random; @@ -104,13 +106,16 @@ public static void main(String[] args) { public static PipelineResult mainWithResult(String[] args) { log.info("Arguments: " + Arrays.toString(args)); - ImportJobOptions options = - PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class); + ImportJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class); if (options.getJobName().isEmpty()) { options.setJobName(generateName()); } - log.info(options.toString()); - + try { + options.setGcpCredential(GoogleCredentials.getApplicationDefault().createScoped(DataflowScopes.all())); + } catch (IOException e) { + log.error("Exception while setting gcp credential manually : ", e.getMessage()); + } + log.info("options: " + options.toString()); ImportSpec importSpec = new ImportSpecSupplier(options).get(); Injector injector = Guice.createInjector(new ImportJobModule(options, importSpec), new PipelineModule()); @@ -206,8 +211,6 @@ private String retrieveId(PipelineResult result) { Class> runner = options.getRunner(); if (runner.isAssignableFrom(DataflowRunner.class)) { return ((DataflowPipelineJob) result).getJobId(); - } else if (runner.isAssignableFrom(FlinkRunner.class)) { - throw new UnsupportedOperationException("Runner not yet supported."); } else { return this.options.getJobName(); } diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java new file mode 100644 index 00000000000..f83c6fb0f6e --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java @@ -0,0 +1,33 @@ +package feast.ingestion.deserializer; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.types.FeatureRowProto.FeatureRow; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +/** + * Deserializer for Kafka to deserialize Protocol Buffers messages + * + * @param Protobuf message type + */ +public class FeatureRowDeserializer implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public FeatureRow deserialize(String topic, byte[] data) { + try { + return FeatureRow.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e); + } + } + + @Override + public void close() { + } +} \ No newline at end of file diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java new file mode 100644 index 00000000000..74c5b03fd9d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java @@ -0,0 +1,33 @@ +package feast.ingestion.deserializer; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.types.FeatureRowProto.*; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +/** + * Deserializer for Kafka to deserialize Protocol Buffers messages + * + * @param Protobuf message type + */ +public class FeatureRowKeyDeserializer implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public FeatureRowKey deserialize(String topic, byte[] data) { + try { + return FeatureRowKey.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new SerializationException("Error deserializing FeatureRowKey from Protobuf message", e); + } + } + + @Override + public void close() { + } +} \ No newline at end of file diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java index 6213f513313..687a8213e57 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java @@ -19,6 +19,8 @@ import com.google.auto.service.AutoService; import java.util.Collections; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.metrics.MetricsSink; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -26,14 +28,14 @@ import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.options.Validation.Required; -public interface ImportJobOptions extends PipelineOptions { +public interface ImportJobOptions extends PipelineOptions, FlinkPipelineOptions, GcpOptions { @Description("Import spec yaml file path") @Required(groups = {"importSpec"}) String getImportSpecYamlFile(); void setImportSpecYamlFile(String value); - @Description("Import spec as native proto binary encoding conveted to Base64 string") + @Description("Import spec as native proto binary encoding converted to Base64 string") @Required(groups = {"importSpec"}) String getImportSpecBase64(); diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureEnums.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureEnums.java new file mode 100644 index 00000000000..77985082cd0 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/FeatureEnums.java @@ -0,0 +1,15 @@ +package feast.ingestion.transform; + +public class FeatureEnums { + public enum InputSource { + FILE, + BIGQUERY, + PUBSUB, + KAFKA + } + + public enum FileFormat { + CSV, + JSON + } +} \ No newline at end of file diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java new file mode 100644 index 00000000000..1655f1e5cad --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java @@ -0,0 +1,71 @@ +package feast.ingestion.transform; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import feast.ingestion.deserializer.FeatureRowDeserializer; +import feast.ingestion.deserializer.FeatureRowKeyDeserializer; +import feast.options.OptionsParser; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FeatureRowProto.FeatureRowKey; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +import static com.google.common.base.Preconditions.checkArgument; + +public class FeatureRowKafkaIO { + + static final String KAFKA_TYPE = "kafka"; + + public static Read read(ImportSpec importSpec) { + return new Read(importSpec); + } + + public static class Read extends FeatureIO.Read { + + private ImportSpec importSpec; + + private Read(ImportSpec importSpec) { + this.importSpec = importSpec; + } + + @Override + public PCollection expand(PInput input) { + + checkArgument(importSpec.getType().equals(KAFKA_TYPE)); + + String bootstrapServer = importSpec.getOptionsMap().get("server"); + + Preconditions.checkArgument( + !Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set"); + + String topic = importSpec.getOptionsMap().get("topic"); + + Preconditions.checkArgument( + !Strings.isNullOrEmpty(topic), "kafka topic must be set"); + + KafkaIO.Read kafkaIOReader = KafkaIO.read() + .withBootstrapServers(bootstrapServer) + .withTopic(topic) + .withKeyDeserializer(FeatureRowKeyDeserializer.class) + .withValueDeserializer(FeatureRowDeserializer.class); + + PCollection> featureRowRecord = input.getPipeline().apply(kafkaIOReader); + + PCollection featureRow = featureRowRecord.apply( + ParDo.of( + new DoFn, FeatureRow>() { + @ProcessElement + public void processElement(ProcessContext processContext) { + KafkaRecord record = processContext.element(); + processContext.output(record.getKV().getValue()); + } + })); + return featureRow; + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java index 38477031aa3..af7945bc777 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java @@ -17,6 +17,7 @@ package feast.ingestion.transform; +import com.google.common.base.Enums; import com.google.common.base.Preconditions; import feast.storage.bigquery.FeatureRowBigQueryIO; import com.google.inject.Inject; @@ -28,37 +29,43 @@ public class ReadFeaturesTransform extends PTransform> { - private ImportSpec importSpec; + private ImportSpec importSpec; - @Inject - public ReadFeaturesTransform(ImportSpec importSpec) { - this.importSpec = importSpec; - } + @Inject + public ReadFeaturesTransform(ImportSpec importSpec) { + this.importSpec = importSpec; + } - @Override - public PCollection expand(PInput input) { - return input.getPipeline().apply("Read " + importSpec.getType(), getTransform()); - } + @Override + public PCollection expand(PInput input) { + return input.getPipeline().apply("Read " + importSpec.getType(), getTransform()); + } - public PTransform> getTransform() { - String type = importSpec.getType(); - Preconditions.checkArgument(!type.isEmpty(), "type missing in import spec"); - if (type.equals("file")) { - String format = importSpec.getOptionsOrDefault("format", null); - Preconditions.checkNotNull(format, "format option missing from import spec of type file"); - if (format.equals("csv")) { - return FeatureRowCsvIO.read(importSpec); - } else if (format.equals("json")) { - return FeatureRowJsonTextIO.read(importSpec); - } else { - throw new IllegalArgumentException("Unknown format in import spec" + type); - } - } else if (type.equals("bigquery")) { - return FeatureRowBigQueryIO.read(importSpec); - } else if (type.equals("pubsub") || type.equals("pubsub")) { - return FeatureRowPubSubIO.read(importSpec); - } else { - throw new IllegalArgumentException("Unknown type in import spec" + type); + public PTransform> getTransform() { + String type = importSpec.getType(); + Preconditions.checkArgument(!type.isEmpty(), "type missing in import spec"); + Preconditions.checkArgument(Enums.getIfPresent(FeatureEnums.InputSource.class, type.toUpperCase()).isPresent(), "The type defined is invalid or not supported"); + switch (FeatureEnums.InputSource.valueOf(type.toUpperCase())) { + case FILE: + String format = importSpec.getOptionsOrDefault("format", null); + Preconditions.checkNotNull(format, "format option missing from import spec of type file"); + switch (FeatureEnums.FileFormat.valueOf(format.toUpperCase())) { + case CSV: + return FeatureRowCsvIO.read(importSpec); + case JSON: + return FeatureRowJsonTextIO.read(importSpec); + default: + throw new IllegalArgumentException("Unknown format in import spec" + type); + } + case BIGQUERY: + return FeatureRowBigQueryIO.read(importSpec); + case PUBSUB: + return FeatureRowPubSubIO.read(importSpec); + case KAFKA: + return FeatureRowKafkaIO.read(importSpec); + default: + throw new IllegalArgumentException("Unknown type in import spec" + type); + } } - } } + diff --git a/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java b/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java new file mode 100644 index 00000000000..c6fc30d1366 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java @@ -0,0 +1,127 @@ +package feast.ingestion.deserializer; + + +import com.google.protobuf.MessageLite; +import feast.types.FeatureRowProto.FeatureRow; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; + +@RunWith(SpringRunner.class) +@EmbeddedKafka(controlledShutdown = true) +public class KafkaFeatureRowDeserializerTest { + + @Configuration + static class ContextConfiguration { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Bean + ProducerFactory producerFactory() { + Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); + + return new DefaultKafkaProducerFactory<>(producerProps, + new ByteArraySerializer(), + new ByteArraySerializer()); + } + + @Bean + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>( + producerFactory(), + true); + } + } + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Autowired + private KafkaTemplate template; + + private void deserialize(MessageType input) { + // generate a random UUID to create a unique topic and consumer group id for each test + String uuid = UUID.randomUUID().toString(); + String topic = "topic-" + uuid; + + embeddedKafka.addTopics(topic); + + Deserializer deserializer = new FeatureRowDeserializer(); + + Map consumerProps = KafkaTestUtils.consumerProps( + uuid, Boolean.FALSE.toString(), embeddedKafka); + ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>( + consumerProps, + deserializer, deserializer); + + BlockingQueue> records = new LinkedBlockingQueue<>(); + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setMessageListener((MessageListener) records::add); + + MessageListenerContainer container = new KafkaMessageListenerContainer<>( + consumerFactory, + containerProps); + container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + + byte[] data = input.toByteArray(); + ProducerRecord producerRecord = new ProducerRecord<>(topic, data, data); + ListenableFuture> producerFuture = template.send(producerRecord); + + try { + producerFuture.get(); + } catch (InterruptedException e) { + return; + } catch (ExecutionException e) { + throw new KafkaException("Error sending message to Kafka.", e.getCause()); + } + + ConsumerRecord consumerRecord; + try { + consumerRecord = records.take(); + } catch (InterruptedException e) { + return; + } + + FeatureRow key = consumerRecord.key(); + Assert.assertEquals(key, input); + + FeatureRow value = consumerRecord.value(); + Assert.assertEquals(value, input); + } + + @Test(timeout = 10000) + public void deserializeFeatureRowProto() { + FeatureRow message = FeatureRow.newBuilder() + .setEntityName("test") + .build(); + deserialize(message); + } +} diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index 537bc1a540f..60199955fc8 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -27,6 +27,12 @@ option java_package = "feast.types"; option java_outer_classname = "FeatureRowProto"; option go_package = "github.com/gojektech/feast/protos/generated/go/feast/types"; +message FeatureRowKey { + string entityKey = 1; + google.protobuf.Timestamp eventTimestamp = 3; + string entityName = 4; + Granularity.Enum granularity = 5; +} message FeatureRow { string entityKey = 1; From 85c853ff53e8d75599005bada5acae9511a30818 Mon Sep 17 00:00:00 2001 From: baskaran eswaran Date: Fri, 28 Dec 2018 02:06:28 +0800 Subject: [PATCH 2/2] add support for reading from multiple kafka topics --- .../transform/FeatureRowKafkaIO.java | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java index 1655f1e5cad..e8b8becfd96 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java +++ b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java @@ -1,3 +1,20 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package feast.ingestion.transform; import com.google.common.base.Preconditions; @@ -15,12 +32,22 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import static com.google.common.base.Preconditions.checkArgument; public class FeatureRowKafkaIO { static final String KAFKA_TYPE = "kafka"; + + /** + * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} + * proto messages from kafka one or more kafka topics. + * + */ public static Read read(ImportSpec importSpec) { return new Read(importSpec); } @@ -43,14 +70,16 @@ public PCollection expand(PInput input) { Preconditions.checkArgument( !Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set"); - String topic = importSpec.getOptionsMap().get("topic"); + String topics = importSpec.getOptionsMap().get("topics"); Preconditions.checkArgument( - !Strings.isNullOrEmpty(topic), "kafka topic must be set"); + !Strings.isNullOrEmpty(topics), "kafka topic(s) must be set"); + + List topicsList = new ArrayList<>(Arrays.asList(topics.split(","))); KafkaIO.Read kafkaIOReader = KafkaIO.read() .withBootstrapServers(bootstrapServer) - .withTopic(topic) + .withTopics(topicsList) .withKeyDeserializer(FeatureRowKeyDeserializer.class) .withValueDeserializer(FeatureRowDeserializer.class);