Blog Agaetis

Kafka Stream : Utiliser l’API Processor en plus de Stream DSL

27 / 04 / 2021read_time 5 min.

Tout le monde a déjà entendu parler de Kafka et lu maints articles à ce sujet.  Tout le monde a testé les “consumers” et “producers”, tout le monde a développé un “stream” pour compter des mots… Mais lorsque l’on sort des sentiers battus, il est difficile de trouver des articles sur comment utiliser la Stream Processor API dans des cas spécifiques. Cette dernière permet d’écrire des Streams Kafka lorsque Streams DSL n’est pas suffisant. Ces API permettent de définir la Topology d’un stream représentant la logique métier à appliquer sur la donnée traitée..

Bien entendu, je pense qu’il faut bien réfléchir avant d’utiliser un outil d’une façon différente de ce pour quoi il a été conçu. La question que l’on doit se poser est “Est-ce que ma façon d’implémenter mon besoin est justifiée, est-ce que j’utilise le bon outil ?” (Il est toujours possible d’ouvrir des huîtres avec un tournevis…)

Quel est le besoin ?

Dans le cadre d’un projet client, des boîtiers installés sur des véhicules envoient des données captées sur ces derniers et sont transmises au format protocol buffer via le protocole MQTT sur des brokers en mode streaming.  C’est un cas d’usage standard qui est facile à mettre en place dans la majeure partie des cas. Cela jusqu’au jour où de nouveaux boîtiers ont leur propre logique : stocker la donnée pour l’agréger au format trajet avant de l’envoyer. 

Cela induit des messages volumineux qui doivent être découpés en sous messages par les boitiers lorsque leur taille dépasse la taille maximum autorisée par le broker pour un message ou que la qualité du réseau ne permet pas d’envoyer des messages trop volumineux.  De plus, ces messages peuvent être envoyés dans le désordre, sur une période plus ou moins longue et sans assurance de tous les recevoir1. Enfin, les données d’un trajet doivent être renvoyées à un tiers une fois le trajet reçu en un seul message reconstruit.

L’implémentation

Le processus

Le processus décrit dans le schéma suivant permet de valider l’envoi d’un trajet consolidé une fois qu’il est entièrement reçu. 

Il doit répondre aux deux cas suivants:

  • Le boitier envoie les données en mode streaming: les paquets ne font pas partie d’un trajet agrégé par le boîtier
  • Le boîtier envoie les données en mode trajet: le boîtier agrège les données d’un trajet et les envoie au broker une fois celui-ci clos. Si le volume de données d’un trajet excède une taille maximale autorisée pour un paquet alors il est découpé en sous messages.

Le contenu du message

Pour qu’il soit possible de savoir si un message est de type trajet, une information facultative doit être rajoutée dans le message (cf  .proto2). Le champs “Trip” est présent si le boîtier émet des données en mode “trajet”, et contient le nombre de paquets représentant le trajet. Cette information permettra de savoir si tous les paquets d’un trajet sont reçus.

message Trip

{

   required int64   start  = 1;

   required int64   end    = 2;

   required uint32  chunk  = 3;

   required uint32  chunks = 4;

}

message Header

{

   required int64 ts     = 1;

   required int64 device = 2;

   optional Trip   trip  = 3;

}

Reconstruire un trajet côté serveur

Comme évoqué précédemment, l’API Streams DSL de Kafka ne permet pas de gérer ce cas. De prime abord nous pourrions penser qu’il est possible de le gérer avec les Sessions Windows, ces dernières sont représentées par des périodes d’activité séparées par des périodes d’inactivité. Si les boitiers envoient les données en mode streaming il est conseillé d’utiliser cette fonctionnalité car elle correspond à la définition d’un trajet. Mais dans notre cas, le trajet est déjà construit par le boîtier et il faut le reconstruire côté serveur. 

Dans notre cas, nous considérons qu’un trajet peut être reconstruit côté serveur lorsque tous les paquets d’un même trajet sont reçus ; si le TTL d’attente de tous les paquets est atteint nous considérons que nous ne pouvons pas envoyer le trajet incomplet au tiers.

De plus, le stream utilisé pour reconstruire les trajets ne doit pas stocker toute la donnée, seulement les index des paquets reçus afin de ne pas avoir de messages de taille trop importante et d’informations inutiles. L’idée est de réduire l’impact mémoire des messages qui transitent par Kafka, dans notre cas seules les métadonnées sont nécessaires pour définir un trajet.

Pour répondre à cette problématique il faut donc utiliser l’API Processor et plus particulièrement son intégration avec l’API Stream DSL. Voici une liste des composants importants:

  • Un objet Trip définit par
    • un device (l’identifiant boitier)
    • une date de début de trajet 
    • une date de fin de trajet
  • Un objet Status qui pour chaque trajet définit:
    • les index des paquets reçus 
    • le nombre de paquets total 
  • Un (De)-Serializer (Avro ou JSON3) pour les objets Trip & Status, déjà fourni par les exemples Kafka
  • Un State Store pour associer à chaque clé (Trip) une valeur Status du trajet

Stores.keyValueStoreBuilder(      

Stores.persistentKeyValueStore("STORE-TRIPS"),

       Serdes.serdeFrom(TripSerializer, TripDeserializer),       

       Serdes.serdeFrom(StatusSerializer, StatusDeserializer));

  • Un Transformer pour la logique spécifique de reconstruction d’un trajet.

package xxx;

import [...]

import com.google.protobuf.Message;

public class ChunkedTripTransformer implements ValueTransformerWithKey<Key, Message, Trip> {

 private static final Logger LOGGER = LoggerFactory.getLogger(ChunkedTripTransformer.class);

 // TTL autorisé pour avoir la liste complète des paquets composants le message

 private final Duration delta;

 // Store utilisé pour stocker le status de chaque trajet

 private KeyValueStore<Trip, Status> kvStore;

 private ProcessorContext context;

 public ChunkTransformer(Duration delta) {

   this.delta = delta;

 }

 @Override

 public void init(ProcessorContext context) {

   this.context = context;

   kvStore = (KeyValueStore<Trip,Status>) context.getStateStore("STORE-TRIP-STATUS");

   context.schedule(Duration.ofDays(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {

     KeyValueIterator<Trip, Status> iter = this.kvStore.all();

     while (iter.hasNext()) {

       KeyValue<Trip, Status> entry = iter.next();

       if (Instant.ofEpochMilli(entry.value.getStartDate()).isBefore(Instant.ofEpochMilli(timestamp - delta.toMillis()))){

  // Suppression des trajets non reçu après expiration du TTL

         this.kvStore.delete(entry.key);

       }

     }

     iter.close();

     context.commit();

   });

 }

 @Override

 public Trip transform(Trip key, com.google.protobuf.Message value) {

   int chunk = value.getHeader().getTrip().getChunk();

   // Si le paquet reçu a un index de chunk incohérent il n’est pas traité

   if (chunk <= 0 || value.getHeader().getTrip().getChunks() < chunk) {

     return null;

   }

   // Si le trajet n’est composé que d’un seul paquet alors le trajet est complet

   if (value.getHeader().getTrip().getChunks() == 1){

     return new Trip(

         key.getDeviceId(),

         Instant.ofEpochMilli(value.getHeader().getTrip().getStart()),

         Instant.ofEpochMilli(value.getHeader().getTrip().getEnd())

     );

   }

   Trip trip = new Trip(key.getDeviceId(), value.getHeader().getTrip().getStart(), value.getHeader().getTrip().getEnd());

   Status status = this.kvStore.get(trip);

   // Si aucun status n’est présent dans le store alors c’est le premier paquet reçu pour le trajet

   if (status == null) {

     this.kvStore.put(key, new Status(value.getHeader().getTrip().getChunks(), chunk));

   } else {

     // Un paquet du trajet est réçu mais ce dernier n’est pas encore complet

     if (!status.received(chunk)) {

       return null;

     }

     // Le trajet est complet il est donc émis et supprimé du Store

     if (status.completed()) {

       this.kvStore.delete(key);

       return key;       

     }

     // Mise à jour du status existant pour le trajet

     this.kvStore.put(key, status);

   }

   return null;

 }

 @Override

 public void close() {

   // nothing to do

 }

}

Conclusion

Dans la majeure partie des cas l’API Stream DSL fournie par Kafka permet de répondre au besoin d’implémentation. Dans cet exemple, le fait d’utiliser un Transformer custom a permis de répondre à la problématique, mais dans certains cas il faut utiliser l’API bas niveau pour construire entièrement le stream en utilisant des Source, Processors et Sink. 

La définition de la Topology établie uniquement avec l’API processor permet une grande liberté d’implémentation mais s’avère verbeuse et contraignante. La couche d’abstraction fournie par l’API Stream DSL simplifie l’écriture des streams et la possibilité de mixer les deux API Processor & Stream DSL évite d’avoir à tout écrire avec uniquement des processors dans certains cas.

Dans cet article nous nous sommes focalisés sur un cas précis, mais lorsqu’est abordé un sujet avec Kafka, il faut commencer par travailler sur la modélisation des données, sur les clés et valeurs qui doivent transiter dans le broker. La clé d’un message est très importante pour le partitioning et un choix inadapté peut avoir des conséquences sur l’écriture des streams et des consumers notamment.

N’hésitez pas à nous contacter si vous avez des questions sur ce sujet ou sur Kafka en général, nous nous ferons un plaisir de vous répondre et d’échanger avec vous !

1 Le protocole MQTT avec un QoS 2 assure qu’un message est envoyé et est reçu une seule fois. Mais dans le cadre du projet les données sont envoyées avec une connexion GSM et certains véhicules roulant à l’étranger peuvent rester de longues périodes dans des zones blanches.

2 Le .proto est le fichier qui définit le format des messages Protocol Buffer.

3 Dans notre cas nous utilisons un Schema Registry avec des schémas Avro.

Un sujet vous intéresse ? Une question ? Contactez-nous !

Nos adresses

Clermont-Ferrand
9, allée Evariste Galois
63170 Aubière
Tél. 04 73 35 47 51
Paris
21, rue de la banque
75002 Paris
Tél. 01 44 63 53 13
Lyon
52, Quai Rambaud
69002 Lyon