от
Я новичок в потоковой передаче kafka-spark и пытаюсь реализовать примеры из документации spark с помощью протокола буферного сериализатора / десериализатора. До сих пор я следовал официальным учебникам по https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial и теперь я застрял со следующей проблемой. Этот вопрос может быть похож на этот пост. Как десериализовать записи из Kafka, используя структурированный поток в Java? Я уже реализовал успешный сериализатор, который пишет сообщения на тему кафки. Теперь задача состоит в том, чтобы потреблять его с помощью искровой структурированной потоковой передачи с помощью специального десериализатора.
public class CustomDeserializer implements Deserializer {

@Override
public Person deserialize(String topic, byte[] data) {
    Person person = null;
    try {
        person = Person.parseFrom(data);

        return person;
    } catch (Exception e) {
               //ToDo
    }

    return null;
 }


Dataset dataset = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", topic)
        .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        .option("value.deserializer", "de.myproject.CustomDeserializer")
        .load()
        .select("value");

    dataset.writeStream()
        .format("console")
        .start()
        .awaitTermination();
Но в качестве вывода я все еще получаю двоичные файлы.


Что касается учебника, мне просто нужно указать опцию value.deserializer, чтобы иметь читабельный формат

.option("value.deserializer", "de.myproject.CustomDeserializer")
Я что-то пропустил?              

Ваш ответ

Отображаемое имя (по желанию):
Конфиденциальность: Ваш электронный адрес будет использоваться только для отправки уведомлений.

2 Ответы

0 голосов
от
Вы пропустили этот раздел документации?   Обратите внимание, что следующие параметры Kafka не могут быть установлены, и источник или приемник Kafka вызовут исключение:         key.deserializer: ключи всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer. Используйте операции DataFrame для явной десериализации ключей.   value.deserializer: значения всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer. Используйте операции DataFrame для явной десериализации значений.    Вам придется зарегистрировать UDF, который вместо этого вызывает ваши десериализаторы Аналогично сообщению Read protobuf kafka с использованием потоковой структурированной искры     
0 голосов
от
Вам необходимо преобразовать байт в тип данных String. dataset.selectExpr («CAST (ключ как STRING)», «CAST (значение как STRING)») Затем вы можете использовать functions.from_json (dataset.col ("value"), StructType), чтобы получить реальный DF. Удачного кодирования :)     
...