reference¶
- project : kafka
- udemy:
- https://www.udemy.com/course/apache-kafka
- https://conduktor.io/apache-kafka-for-beginners
- slides : https://learn.conduktor.io/kafka/
- https://chatgpt.com/c/674a1fef-5634-800d-b445-dfa969b74011
- ccgg program : https://chatgpt.com/c/68007bd1-4a04-800d-ace4-39873a168454
kafka¶
A intro¶
- real-time data streaming pipelines. (primary task)
- data stream : unbounded/endless sequence of data, with data throughput can high or low. eg:
- Log Analysis - Log stream from multiple ms.
- Web Analytics - modern web app measure user activity.
open-source
, managed by confluent(linkedIn)- distributed system
- cluster + brokers/nodes (has
TOPICS
) - scalable and fault-tolerant to node loss.
- cluster + brokers/nodes (has
B install¶
- 1 curl -L https://releases.conduktor.io/quick-start -o docker-compose.yml && docker compose up -d --wait && echo "Conduktor started on http://localhost:8080"
- 2 https://github.com/conduktor/kafka-beginners-course/tree/main/conduktor-platform - worked(old one)
- next:
- for new update: https://conduktor.io/get-started
- check all container: https://releases.conduktor.io/quick-start -o docker-compose.yml
- launch UI/conduktor : http://localhost:8080
C use cases¶
- https://chatgpt.com/c/6748bff9-8df8-800d-8faa-ac5244853529
1 as a data integration layer¶
- producer and consumer system/s, with diff data-format + schema, protocol
2 Decouple systems¶
3 Microservice communication¶
4 Integration with Big Data technologies¶
5 Event-sourcing store¶
6 Activity Tracker¶
- Gather metrics from many different locations
- collect logs
-
collect web user activity
-
D fundamental / component¶
- https://chatgpt.com/c/6748c06d-048c-800d-996e-6ca852cd0329
producer
--> kafka-Cluster [ broker > topic > partition ] -->consumer group/s
[consumer-1,... ]
0 broker¶
- single Kafka server
- kakfa store data in a directory on the broker disk.
- if we connect to any broker, then can discover and connect to other broker in the same cluster
- Every broker in the cluster has metadata about all the other brokers
- therefore any broker in the cluster is also called a
bootstrap server
.
1 topics¶
- roughly analogous to SQL tables (not queryable)
- data store in
binary-format
- data retention: 7 days (default) + offset.retention.min=24*60 (default) #broker level config
- partitions : topic is broken down into a number of partitions
- to achieve high throughput and scalability + parallel consumer/s
- Kafka does a good job of distributing partitions evenly among the available brokers.
- up to 200,000 partition (with zookepeer)
- without zoo kepeer - millions of partition.
- offset - integer value that Kafka adds to each message as it is written into a partition. from 0.
## Consumer setting to commit offset: # a. Auto - enable.auto.commit=true - auto.commit.interval.ms=5000 - offsets.retention.minutes --> once commited, offset will be availble for another 24 hr by default, to that consumer-group. # b. Manual - enable.auto.commit=false - KafkaConsumer.CommitSync()/CommitASync(); # c Manaul advance - skip it - stores offset externally
- replicas / In-Sync Replicas (ISR)
- to archive - resilience and availability
cluster - broker-1, broker-2 , broker-3 (3 brokers) Topic-1 - partition-1 , partition-2, partition-3 # initial distribution (with RF=1) broker-1 : [ partition-1 ] broker-2 : [ partition-2 ] broker-3 : [ partition-3 ] # add replication: RF=? - RF=2 broker-1 : [ partition-1, partition-3-isr1 ] broker-2 : [ partition-2, partition-1-isr1 ] broker-3 : [ partition-3, partition-2-isr1 ] - RF=3 broker-1 : [ partition-1, partition-3-isr1, partition-2-isr2 ] broker-2 : [ partition-2, partition-1-isr1, partition-3-isr2 ] broker-3 : [ partition-3, partition-2-isr1, partition-1-isr2 ] - RF=4 (invalid) : must be <= no of broker
- to archive - resilience and availability
2 producer¶
- application - java/py with kafka client.
- Kafka producers only write data to the leader broker for a partition
- specify a level of acknowledgment
acks
- acks=0 : written successfully
- acks=1 : written successfully + acknowledged by leader
- acks=all : written successfully + acknowledged by leader + accepted by all ISR
- if ack not received, the producer retries.
- retries : 0 - 2^32
- retry.backoff.ms = 100 ms (default) # retry delay
- delivery.timeout.md= # max time for delivery, afterwards exception which developer has to handle.
-
idempotent producer
- use kafka 3+
- detects duplicate and prevent it.
- has retry ability with duplicate check.
-
kafka 3+ sets below
- producer.idempotence = true - acks=all - min.insync.replicas=2 # leader + at leat one replica. - reties= MAX_INT - delivery.timeout.ms = 120000 # 2 min - max.in.flight.request.oer.connection=5
-
archive message:
- at
producer level
- at
broker level
- consumes CPU cycle, thus performance issue.
- at
-
High throughput producer:
- partition class
- RoundRobin
- sticky (looks for batch.size + linger.ms)
- linger.ms = 10ms #
accumulate
message for 10 ms and then send - batch.size = 16000 #
accumulate
message till 16 kb and then publish - note: increase above values, to achieve High throughput
- linger.ms = 10ms #
- RoundRobin
- compression (use snappy)
- compression.type=snappy
- spring.kafka.producer.properties.compression.type=snappy
- partition class
3 message¶
message-value
: content-
message-key
- null : load balance , round-robin fashion into p1,p2,...
- non-null :
- all messages that share the same key, will always go to same partition.
- true unless partition NOT chnages
- uses hashing
murmur2 algo
-
Kafka Message Serializers
/Kafka Message Deserializers
- IntegerSerializer
- StringSerializer
- converts message-value/key into byte streams
4 Consumer¶
- application - java/py,etc with kafka client.
4.1 pull model (skip)¶
- instead of having Kafka brokers continuously push data to consumers,
- consumers must request data from Kafka brokers
4.2 consumer group¶
- each partition of topic is consumed by one consumer within a consumer group
- Messages are effectively divided among the consumers.
- static consumer in group --> having
group.instance.id
is also set. topic with 2 partition consumed by : - consumer-1 - consumer-2 - consumer-group-1 (consumer-3, sonsumer-4(idempotent)). - consumer-4 leave and comes back within thresold time, the will get same partition again. <<< Together, Consumer-3 and Consumer-4 consume all messages from the topic, dividing the workload between the two partitions.
4.3 Publish-Subscribe Behavior¶
- topic-1{p-1, p-2,p-3} --> consumer/s:
- consumer-group-1(consumer-1 on p-1 ,consumer-2 on p-2,p-3 ) ===
subscriber-1
- consumer-group-2(consumer-1 on p-1,consumer-2 on p-2, consumer-3 on p-3, consumer-4 :: IDLE ) ===
subscriber-2
- consumer-group-2(consumer-1) ===
subscriber-3
- If a SINGLE consumer consumes data from multiple partition (p-0,p-1,p-3)
- the message ordering is not guaranteed across multiple partitions.
- consumer-group-1(consumer-1 on p-1 ,consumer-2 on p-2,p-3 ) ===
4.4 commit offset :green_circle:¶
## Consumer setting to update it:
# a. Auto
- enable.auto.commit=true + auto.commit.interval.ms=1000 (1 min)
# b. Manual
- enable.auto.commit=false
- consumer processing - A-sync or sync
- KafkaConsumer.CommitSync()/CommitASync();
4.5 Delivery semantic (just concept)¶
- scenario-1 ::
at most once
(max=1)- consumer > poll > processing synchronously (will take around 2 min)
- offset auto-updated by 1, after 1 min of polling.
- early offset update, since processing stilling going on.
- Scenario-2 ::
at least once
(min=1)- consumer > poll > processing synchronously (will take around 20 sec)
- 2 message read, and broker crashed while processing 3rd.
- offset not updated.
- broker is up again and will consume from old offset
- above 2 messages will be processed again.
- so keep consumer
idempotent
4.6 Rebalance :green_circle:¶
- whenever consume leaves/joins group, rebalance happens
- moving partition b/w consumers.
- if static member leave the group and joins back within session.timeout.ms, the gets it original partition.
4.7 liveliness :yellow_circle:¶
- threads running on broker to check of consumer/s:
- heartbeat thread
- heartbeat.interval.ms=
- poll thread
- checks time-interval b/w 2 poll() calls
- max.poll.interval.ms= 5minute(default) # keep high for Bigdata.
- side note: if needed update max.poll.records
4.8 replay :yellow_circle:¶
- consumer config : auto.offset.rest=???? # update these for topic, for replay
latest
- read latest, no historyearliest
- read from offset 0, all historynone
- throws exception no offset found.- offset.retention.minutes=
specific-value
. eg:500shiftby
- fact: replay has wont no impact on imdepotent-consumer
4.7 Advance config (skip)¶
- long polling mechanism --> fetch.max.wait.ms=
- can read from geographically the closest replica.
- partition-0 (leader) : aws-region1-az1
- partition-1 (isr) : aws-region1-az2
- consumer runinng on az2, then can configure to read from partition-1 (isr) only.
- use this, if want to reduce aws network cost
- config on consumer:
- rack.id= usw2-az2
- replica.selcetor.class= RackAwareReplicaSelector
- client.rack= data-center-id
5 more¶
-
5.1 Kafka Connect
- Kafka Connect Source Connectors
- Kafka Connect Sink Connectors
-
5.2 Schema Registry
-
5.3 ksqlDB
- transform Kafka topics to SQL-like database
- thus can perform SQL-like operation
-
5.4 Zookeeper
- like master node in k8s cluster.
- Zookeeper is used to track cluster state, membership, and leadership.
- Being Eliminated from Kafka v4.x. less secure
- metadata management in the Kafka world
- perform leader elections
- stores configurations for topics and permissions.
- does NOT store consumer offsets
- ensemble / Zookeeper cluster: 3,5, 7,...
-
5.5 Kafka KRaft Mode
Programs¶
- https://chatgpt.com/c/674a1fef-5634-800d-b445-dfa969b74011
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=kafka-generic-consumer-group spring.kafka.consumer.auto-offset-reset=earliest/latest/none spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
producer¶
- @Autowired KafkaTemplate<String, String> kafkaTemplate;
- String message = objectMapper.writeValueAsString(student);
- kafkaTemplate.send("topic-1", message);
produce - sysn + async¶
- sysc - send(produceRecord)
- a-sync - send(produceRecord,
new Callback() { @override onCompletion ... }
)
produce in batch¶
- props.put("batch.size","400");
- key must be null
- props.put("partitioner.class","RR); // for demo purpose only.
- consumer side
- read messages in batch
poll()
- process batch
for(m:messages)
- then update offset manually
- keep consumer code, idempotent.
- read messages in batch
produce with key (k1,k2,k3)¶
- add consumer group with 3 consumer/s
- check ordering :)
consumer¶
@KafkaListener(topics = {"kafka-topic-1", "kafka-topic-2"}, groupId = "kafka-generic-consumer-group") m(String s) {...}
scenario-1: generic consumer for diff schema¶
kafka-topic-1 (schema : student)
kafka-topic-2 (schema- customer)
kafka-generic-consumer-1 : subscribed to kafka-topic-1 and kafka-topic-2.
# producer sending json
# De-Serailize json to string
# while consuming, Objectmapper.readObject(jsonStr, student/customer.class)
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
scenario-2 : partitions < consumer¶
Topic: topic-1 with 2 partitions (partition-0 and partition-1).
Consumer Group: topic-1-group-1.
Consumers: c1, c2, c3, c4.
# Partition Assignment
partition-0: Assigned to c1.
partition-1: Assigned to c2.
c3 and c4 are idle because there are not enough partitions for them.
scenario-3 : partitions > consumer¶
Topic: topic-1 with 4 partitions (partition-0, partition-1, partition-2, partition-3).
Consumer Group: topic-1-group-1.
Consumers: c1, c2
# Partition Assignment :
## --- Using RangeAssignor ---
c1: partition-0, partition-1.
c2: partition-2, partition-3.
## --- Using RoundRobinAssignor ---
c1: partition-0, partition-2.
c2: partition-1, partition-3.