Kafka Stream : utiliser l'API Processor en plus de Stream DSL

avr. 27, 2021

Tout le monde a déjà entendu parler de Kafka et lu maints articles à ce sujet. Tout le monde a testé les “consumers” et “producers”, tout le monde a développé un “stream” pour compter des mots… Mais lorsque l’on sort des sentiers battus, il est difficile de trouver des articles sur comment utiliser la  Stream Processor API  dans des cas spécifiques. Cette dernière permet d’écrire des Streams Kafka lorsque  Streams DSL  n’est pas suffisant. Ces API permettent de définir la  Topology  d’un stream représentant la logique métier à appliquer sur la donnée traitée..

Bien entendu, je pense qu’il faut bien réfléchir avant d’utiliser un outil d’une façon différente de ce pour quoi il a été conçu. La question que l’on doit se poser est “Est-ce que ma façon d’implémenter mon besoin est justifiée, est-ce que j’utilise le bon outil ?” (Il est toujours possible d’ouvrir des huîtres avec un tournevis…)



Quel est le besoin ?

Dans le cadre d’un projet client, des boîtiers installés sur des véhicules envoient des données captées sur ces derniers et sont transmises au format protocol buffer via le protocole MQTT sur des brokers en mode streaming. C’est un cas d’usage standard qui est facile à mettre en place dans la majeure partie des cas. Cela jusqu’au jour où de nouveaux boîtiers ont leur propre logique : stocker la donnée pour l’agréger au format trajet avant de l’envoyer. 

Cela induit des messages volumineux qui doivent être découpés en sous messages par les boitiers lorsque leur taille dépasse la taille maximum autorisée par le broker pour un message ou que la qualité du réseau ne permet pas d’envoyer des messages trop volumineux. De plus, ces messages peuvent être envoyés dans le désordre, sur une période plus ou moins longue et sans assurance de tous les recevoir (1). Enfin, les données d’un trajet doivent être renvoyées à un tiers une fois le trajet reçu en un seul message reconstruit.



 L'implémentation

Le processus

Le processus décrit dans le schéma suivant permet de valider l’envoi d’un trajet consolidé une fois qu’il est entièrement reçu. 


 Il doit répondre aux deux cas suivants :

  • Le boitier envoie les données en mode streaming: les paquets ne font pas partie d’un trajet agrégé par le boîtier
  • Le boîtier envoie les données en mode trajet: le boîtier agrège les données d’un trajet et les envoie au broker une fois celui-ci clos. Si le volume de données d’un trajet excède une taille maximale autorisée pour un paquet alors il est découpé en sous messages.

Le contenu du message

Pour qu’il soit possible de savoir si un message est de type trajet, une information facultative doit être rajoutée dans le message (cf .proto (2)). Le champs “Trip” est présent si le boîtier émet des données en mode “trajet”, et contient le nombre de paquets représentant le trajet. Cette information permettra de savoir si tous les paquets d’un trajet sont reçus.


message Trip {    required int64  start  = 1;    required int64  end    = 2;    required uint32 chunk  = 3;    required uint32 chunks = 4; }
message Header {    required int64 ts     = 1;    required int64 device = 2;    optional Trip  trip   = 3; }


Reconstruire un trajet côté serveur

Comme évoqué précédemment, l’API  Streams DSL  de Kafka ne permet pas de gérer ce cas. De prime abord nous pourrions penser qu’il est possible de le gérer avec les  Sessions Windows,  ces dernières sont représentées par des périodes d’activité séparées par des périodes d’inactivité. Si les boitiers envoient les données en mode streaming il est conseillé d’utiliser cette fonctionnalité car elle correspond à la définition d’un trajet. Mais dans notre cas, le trajet est déjà construit par le boîtier et il faut le reconstruire côté serveur. 

Dans notre cas, nous considérons qu’un trajet peut être reconstruit côté serveur lorsque tous les paquets d’un même trajet sont reçus ; si le TTL d’attente de tous les paquets est atteint nous considérons que nous ne pouvons pas envoyer le trajet incomplet au tiers.

De plus, le stream utilisé pour reconstruire les trajets ne doit pas stocker toute la donnée, seulement les index des paquets reçus afin de ne pas avoir de messages de taille trop importante et d’informations inutiles. L’idée est de réduire l’impact mémoire des messages qui transitent par Kafka, dans notre cas seules les métadonnées sont nécessaires pour définir un trajet.

Pour répondre à cette problématique il faut donc utiliser l’ API Processor  et plus particulièrement son  intégration avec l’API Stream DSL . Voici une liste des composants importants:

  • Un objet Trip définit par
  • un device (l’identifiant boitier)
  • une date de début de trajet 
  • une date de fin de trajet

  • Un objet Status qui pour chaque trajet définit:
  • les index des paquets reçus 
  • le nombre de paquets total 

  • Un (De)-Serializer (Avro ou JSON (3)) pour les objets Trip & Status, déjà fourni par les  exemples  Kafka

  • Un State Store pour associer à chaque clé (Trip) une valeur Status du trajet

Stores.keyValueStoreBuilder(    Stores.persistentKeyValueStore("STORE-TRIPS"),    Serdes.serdeFrom(TripSerializer, TripDeserializer),    Serdes.serdeFrom(StatusSerializer, StatusDeserializer) );

  • Un Transformer pour la logique spécifique de reconstruction d’un trajet.
 package xxx;
import [...] import com.google.protobuf.Message;
public class ChunkedTripTransformer implements ValueTransformerWithKey<Key, Message, Trip> {   private static final Logger LOGGER = LoggerFactory.getLogger(ChunkedTripTransformer.class);   // TTL autorisé pour avoir la liste complète des paquets composants le message   private final Duration delta;   // Store utilisé pour stocker le status de chaque trajet   private KeyValueStore<Trip, Status> kvStore;   private ProcessorContext context;
  public ChunkTransformer(Duration delta) {     this.delta = delta;   }
  @Override   public void init(ProcessorContext context) {     this.context = context;     kvStore = (KeyValueStore<Trip,Status>) context.getStateStore("STORE-TRIP-STATUS");     context.schedule(Duration.ofDays(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {       KeyValueIterator<Trip, Status> iter = this.kvStore.all();       while (iter.hasNext()) {         KeyValue<Trip, Status> entry = iter.next();         if (Instant.ofEpochMilli(entry.value.getStartDate()).isBefore(Instant.ofEpochMilli(timestamp - delta.toMillis()))) {           // Suppression des trajets non reçu après expiration du TTL           this.kvStore.delete(entry.key);         }       }       iter.close();       context.commit();     });   }
  @Override   public Trip transform(Trip key, com.google.protobuf.Message value) {     int chunk = value.getHeader().getTrip().getChunk();     // Si le paquet reçu a un index de chunk incohérent il n’est pas traité     if (chunk <= 0    value.getHeader().getTrip().getChunks() < chunk) {       return null;     }     // Si le trajet n’est composé que d’un seul paquet alors le trajet est complet     if (value.getHeader().getTrip().getChunks() == 1) {       return new Trip(         key.getDeviceId(),         Instant.ofEpochMilli(value.getHeader().getTrip().getStart()),         Instant.ofEpochMilli(value.getHeader().getTrip().getEnd())       );     }     Trip trip = new Trip(key.getDeviceId(), value.getHeader().getTrip().getStart(), value.getHeader().getTrip().getEnd());     Status status = this.kvStore.get(trip);     // Si aucun status n’est présent dans le store alors c’est le premier paquet reçu pour le trajet     if (status == null) {       this.kvStore.put(key, new Status(value.getHeader().getTrip().getChunks(), chunk));     } else {       // Un paquet du trajet est réçu mais ce dernier n’est pas encore complet       if (!status.received(chunk)) {         return null;       }       // Le trajet est complet il est donc émis et supprimé du Store       if (status.completed()) {         this.kvStore.delete(key);         return key;       }       // Mise à jour du status existant pour le trajet       this.kvStore.put(key, status);     }     return null;   }
  @Override   public void close() {     // nothing to do   } }

Conclusion

Dans la majeure partie des cas l’API Stream DSL fournie par Kafka permet de répondre au besoin d’implémentation. Dans cet exemple, le fait d’utiliser un Transformer custom a permis de répondre à la problématique, mais dans certains cas il faut utiliser l’API bas niveau pour construire entièrement le stream en utilisant des Source, Processors et Sink. 

La définition de la Topology établie uniquement avec l’API processor permet une grande liberté d’implémentation mais s’avère verbeuse et contraignante. La couche d’abstraction fournie par l’API Stream DSL simplifie l’écriture des streams et la possibilité de mixer les deux API Processor & Stream DSL évite d’avoir à tout écrire avec uniquement des processors dans certains cas.

Dans cet article nous nous sommes focalisés sur un cas précis, mais lorsqu’est abordé un sujet avec Kafka, il faut commencer par travailler sur la modélisation des données, sur les clés et valeurs qui doivent transiter dans le broker. La clé d’un message est très importante pour le partitioning et un choix inadapté peut avoir des conséquences sur l’écriture des streams et des consumers notamment.

N’hésitez pas à nous contacter si vous avez des questions sur ce sujet ou sur Kafka en général, nous nous ferons un plaisir de vous répondre et d’échanger avec vous !

1 Le protocole  MQTT  avec un QoS 2 assure qu’un message est envoyé et est reçu une seule fois. Mais dans le cadre du projet les données sont envoyées avec une connexion GSM et certains véhicules roulant à l’étranger peuvent rester de longues périodes dans des zones blanches.

2 Le .proto est le fichier qui définit le format des  messages  Protocol Buffer.

3 Dans notre cas nous utilisons un Schema Registry avec des schémas Avro. 

Ressources Agaetis

par David Walter 16 févr., 2024
OpenAI, a récemment dévoilé SORA, un outil de génération de vidéo. SORA monte encore une marche, offrant des capacités de génération de vidéos réalistes. Cet article explore les caractéristiques clés de SORA, son impact potentiel sur diverses industries, les points de réflexions et l'impact pour l'avenir de la création de contenu. Qu'est-ce que SORA ? SORA est une interface avancée conçue par OpenAI qui permet de générer des séquences vidéo à partir de descriptions textuelles simples. Utilisant des techniques de pointe en matière d'intelligence artificielle et d'apprentissage profond, SORA est capable de comprendre des commandes complexes et de les traduire en contenus visuels impressionnants. Une qualité de génération inégalée La capacité de SORA à générer des vidéos époustouflantes souligne un tournant dans le domaine de la production vidéo, où la qualité et la créativité ne sont plus entravées par des contraintes techniques ou financières. Cette avancée s'inscrit dans un contexte plus large où l'IA transforme profondément les industries créatives, offrant des outils puissants pour la transcription, le doublage, la création d'avatars générés par IA, et même la suppression de fonds vidéo, rendant ces processus plus accessibles et flexibles​​​​​​. Des outils comme Descript et Adobe Premiere Pro intègrent des fonctionnalités AI pour améliorer le processus d'édition vidéo, depuis la rotation des yeux jusqu'à la génération de transcriptions et sous-titres​​. De même, la comparaison entre DALL-E 3 et Midjourney montre comment l'IA peut capturer des détails et des ambiances spécifiques dans les images, un principe également applicable à la vidéo​​. La révolution du streaming vidéo illustre comment l'adaptation numérique bouleverse les modèles économiques traditionnels, offrant une perspective sur la manière dont les technologies génératives pourraient remodeler le paysage médiatique​​. L'impact de ces technologies dépasse la simple création de contenu ; elles remodèlent également notre compréhension de la créativité et ouvrent de nouvelles voies pour l'expression artistique et la communication. Avec des outils comme SORA, la barrière entre l'idée et sa réalisation se réduit, permettant à un plus grand nombre de personnes de donner vie à leurs visions créatives sans les contraintes traditionnelles de la production vidéo. Cet élan vers une qualité de génération inégalée par l'IA soulève des questions importantes sur l'avenir du contenu créatif et la manière dont nous valorisons l'interaction entre l'humain et la technologie dans le processus créatif. Alors que nous explorons ces nouvelles frontières, il est crucial de rester attentifs aux implications éthiques et aux défis que ces technologies posent, tout en reconnaissant leur potentiel pour enrichir notre monde visuel et narratif.
Airflow PostgreSQL MongoDB
par Ikram Zouaoui 07 févr., 2024
Integration de technologies pour optimiser les flux de travail : L'article met en lumière une approche combinée utilisant Airflow, PostgreSQL, et MongoDB pour améliorer l'efficacité des flux de travail liés aux données.
Show More
Share by: