Jun Rao, Confluent – Kafka Controller: A Deep Dive | Bay Area Apache Kafka® Meetup

– So our third talk is from
our co-founder of Confluent, Jun Rao, who will give us a
deep dive on Kafka controller. Which is a single entry
point for all kinds of administrative requests
to a Kafka cluster. And in the old versions of Kafka, the Kafka controller has been reported as one of the main factors for limiting Kafka cluster scale. So Jun Rao tonight is going to talk about all kinds of improvements
that the community and Confluent has been doing
to basically limit that, to lift the limit of Kafka
controller in a cluster. – Thanks, everyone. Before I start, can I do
a quick show of hands, how many people have
used Kafka in production? That’s pretty much everybody. All right, okay. Okay, so sounds good. So what I’m gonna do today, I’m gonna be talking about
what is controller in Kafka, what are some of the common
things a controller does, and what are some of the common problems we have seen before and what other things we
have done to improve it. So, to talk about a controller, the first thing we have
to understand a little bit is how Kafka replication works. This is actually the replication
within the Kafka cluster. All right, so when you publish
data in a Kafka topic today, we allow you to store the data redundantly in multiple copies. Those different copies will be
stored in different brokers. So this picture shows you a
little bit of how this works. So this is a picture shows you, you have a cluster of four brokers. And in this case, we have
a replication factor three. So if each of those little little box corresponds to a replica for a partition, those little box of the same color, they corresponds to the same partition but different replicas. Right, as you can see, after your create this topic the replicas of those different partitions are sort of distributed across all the brokers. If any of those broker goes down, there is another live replica that has exactly the same data
that can serve the same data. So that gives you a higher availability and potentially durability as well. Now a little more detail
of how this works. If you zoom into a
particular partition, right, this is shows we have these three replicas just for one partition. So in this case, one of the replica, in this case, the replica on Broker 1, will be designated as the leader, the other two replicas
will be the followers. So the flow is order writes
will go to the leader replica. So the leader replica will take it and write to his local
log or the followers, they just ask the leader for new data. And then once it get the
data from the leader, it will write the data
in exactly the same order the leader has decided in his local log, and the follower write
the same data on his log. So over time, I think, all the replicas will
have exactly the same data in exactly the same order
the leader has decided. And we can, if you have a producer, you can decide when the producer will get acknowledgement of the data. The producer can either
just wait for the leader to get it but if you want
the data to be persisted and not to be lost or
to any leader failure, you can also choose to wait until the data is fully
replicated to all the replicas. So that’s sort of the other mode. In any case the way we expose
leader consumer is only when the data is fully committed meaning that the data is actually fully replicated to the
leader and the follower. That’s when we will expose
data to the readers. So the reader always sees committed data, and it’s always consistent
because every time when the leader fails, you can always switch to another replica that’s guaranteed to have the
previously committed data. So that’s a little bit at a high level how the replication works. As you can see we do have this concept of leaders and followers. And when the leader fails, we need some logic to select who should be the new leader right. So that’s essentially the
role of the controller. So this is a high level description of what a controller does. First thing, there is
only a single controller per Kafka cluster. There’s not a separate process. It’s actually embedded
in one of the brokers. So one of the brokers acts as a controller for the whole cluster. The controller does a couple of things. The first thing it does
is it has to monitor the live-ness of every
broker in the whole cluster because it has to know which one is alive, which one is down and
then they can act on that. The second thing which
is also important is, on those broker changes, if one of the partition’s
leader is no longer alive, the controller is responsible
for electing a new leader and once that decision has been made, the controller is also
responsible for propagating this information to
the rest of the brokers in the whole cluster. So that the broker has the new information about how to serve the
info, serve the data. Next, I’m gonna walk you through just sort of a bit on some
of the important things the controller has to do. The first thing is how is
the controller elected? Well, the way we elect controller is there’s like a path in Zookeeper, which is written here /controller. There’s a single pass there. Every time when you bring up a broker, every broker will try to write itself, its value to this controller path. Of course, only the one of the brokers can win the creation of this path, right. Because if the path already exists, you cannot create the same path again. You’ll get an exception. So whoever wins that creation
becomes the controller. In this case, the first broker,
Broker 0 wins that path. So it’s the controller. It’s the actually writes
a little bit about itself, the metadata in this path, so that we know who is the controller. Okay, the second thing
we need to talk about is there is certain metadata
state at the class level. For simplicity, for each
of the topic partition, we have to maintain a
little bit of metadata about this partition. And the most important thing, of course, is who is the current
leader of this partition. There’s also some other things like which replicas are in sync that’s also part of the metadata. So I didn’t, just didn’t
write it down on the slide. So all this information are
pretty mission critical, right. It’s partition-level metadata. This information because
it’s mission critical, we need to store the information such that it can survive individual broker failures. So currently all this metadata
are persisted in Zookeeper. So each of this partition’s data is persisted in a separate
path in Zookeeper. But the controller also
needs the information for responding to client requests and to propagate this information when the broker is restarted. So for efficiency the controller also has a in-memory cache of the same data. So cache is this in its Java heap memory in the in the controller broker. So that’s the second
thing, which is you know, the controller has some states that it has to manage for the whole cluster. And of course, the state is backed by a persistent Zookeeper. Next, let’s walk through
some of the common operations the controller has to do. The first thing the
controller does is to react to some broker failures. The most common failure of a broker is what we called this controller failure. Meaning that the broker actually is fine. There is nothing wrong with the broker. But an administrator
wants to stop the broker because they can push in
some new configurations or maybe deploying a new code, right. So we have some logic to
make that process faster to minimize the impact to the client when you have this kind
of controller failure. So this slide describes the
process of what happens. So the first thing we do is if you want to shut down a broker let’s say, in this case, Broker 1, what we’ll do is we’ll
send a SIG_TERM signal in step one to this broker. So once this broker received
the SIG_TERM signal, it will send the internal
request to the controller to indicate to the controller that it’s intending to shut down. Now once the controller receive this, it has to act on that. In particular, it has to do some selection for the leader for those partitions that are about to shut down. So in this case, we can
see on this Broker 1, which is about to shut
down, it has two partitions, both of which it are,
it’s the leader for those. Of course, we know when
this broker goes down, it cannot be the leader. And so, so what we want to do is we want to do this a
little bit proactively, meaning that before this
broker actually goes down, Broker 1 goes that we
wanna move the leader of these two partitions
to some other brokers so that hopefully by
the time it shuts down, there’s no impact to the live traffic. So that’s what a controller
does in step three. It has to change the leader
for these two partitions. And in this case, Broker
2 is actually alive, and it actually has a
replica for both partitions. So in this case, the controller
will change the leader for both partitions from
Broker 1 to Broker 2. Before it does that, it
has to do couple things. One is because this is actually pretty critical metadata, right, and it has to persist that. So the first thing it has to do is to write all this
information to Zookeeper. So once this is persisted, we know this information can’t be lost, then the controller can
communicate this information about the new leader to the rest brokers, in this case, Broker 2. And at that point, it can
also acknowledge the broker, Broker 1, who is the one who initiated the controlled shutdown request. Now the controlled shutdown can complete. Broker 1 can now shut down. But that point, since
we have a mood leader already away from Broker 1. So there’s no traffic to Broker 1. So that actually shutting
down broker is fine because it’s not really
impacting the client. So that’s sort of the overall of the controlled shutdown logic. What are some of the issues with this in some of the earlier versions of Kafka? Well, there are a couple issues. The first issue is remember in step three what do we have the controller is the controller has to persist all the new information
about the leader, right. For each of those topping partitions, whose leader has changed to Zookeeper. And in the earlier version,
this persisting step, it’s serial, meaning that
we’ll write one partition’s new value to Zookeeper,
wait until it completes, and then proceed to the next partition. So if you have lots of partitions, the total amount of time it
takes can be long, right. So that will just impact
the amount of time a broker can shut down
in a controlled way, and can affect for example
your whole cluster’s rolling restarts time. So that’s the first issue. The second issue is the communication from the controller to other
brokers of those new metadata, you know, who is the new leader. In some of the earlier versions, this communication is not patched. So we actually will send just a single remote request
for each partition. So one of the issues is if you again, if you have lots of partitions, right, you’ll end up with lots of small requests queued up to a broker. And each of the request
take a bit of time, CPU time for the broker to be processed. So that can also add up that can impact the load on the broker. So these are sort of two problems with the controlled shutdown. I’ll talk about what other things we did to improve that just a bit later. The second thing the controller often does is the controller failover. So when you do a rolling restart
of a whole cluster, right, at some point you have to
restart the controller broker. So now when the controller
fails, of course, we need a process to select another broker with alive as a new controller, right. So that’s what the processes do. Let’s say in this case, Broker
0 was the older controller. Let’s say it fails. Of course, after it fails
this path in Zookeeper because it’s likely ephemeral
path, it will be gone. So now everybody will be notified there’s no longer a controller
for the whole cluster. Now they were all be
trying to select themselves as the new controller by
trying to write to that path. So let’s say another broker, in this case, Broker 2 wins that creation. So now Broker 2 is the new controller. So this all happens automatically, right, without any human intervention. But one of the things
what this new control has to do is to load up its state because remember earlier,
I mentioned we have all those persistent state ready in Zookeeper about the metadata for each
of those topping partitions. This is actually pretty
critical information. For the controller to start
working as a controller, it actually needs to have all
the states loaded in memory. So in this phase, in this case, when you have to failover controller, the first thing the new
controller has to do is to load up all the
partition-level states in its memory before it can
act as the new controller. So that’s sort of the one
of the important things in the controller failover. Now what summary issues
with the controller failover in some of the earlier versions
of Kafka or the couple, the first issue is the the reads from the from Zookeeper into this new controller
is also similar to it before it’s read sort of serially. We read it like a one partition at a time. So again, if you have lots of partitions, this can add up and this
is actually important in the sense because when
the controller has to fail over, it actually has to read all the partitions in the cluster, not just the partitions
that it locally has because it actually needs to see the state for the whole cluster. So if you have lots of
partitions at the cluster level, this process can also take long. And the impact, of course,
the longer it takes, the more delayed the controller will be before it can do its useful things like electing new leaders. So this can also affect the
availability of the cluster. If you have, if this process is long. Another issue we had is actually a little reach to the older controller. So in the common case, when you
have the controller failover the old controller is
actually really gone. So you can’t do any other things, right, once it’s well once you
have a new controller. But in some rare cases it could be maybe the older controller
didn’t really physically die. It could be the cap of a long GC, a garbage collection, right. So then because of that, it
can also lose its session to the Zookeeper server. In this case, the Zookeeper server can treat this broker as temporarily gone. And then a new controller will be elected. Now the question is what happens when this old controller wakes up from a long GC, right? Because now there’s a new
controller that’s acting. We don’t really want the older controller to be interfering with the new leader. Since I’m the older version of Kafka, the fencing logic is a little bit weak. So in some of the rare cases, it can be when that old
controller wakes up from a GC, it may be doing something
that’s a little bit in conflict with the new controller. So then it can lead to
inconsistency at the cluster level. So that’s another issue. Next, I’m gonna talk about
some of improvements we have, we have done in this space. The first thing is some of the performance improvements we did that is actually in Apache Kafka 1.1. And it’s about like a year and a half ago. So one of the things we
did is to improve the way how the controller
interacts with Zookeeper by doing reads or writes. So remember earlier, when I talked about some of the performance issues is a lot of the things we do
with Zookeeper is like cereal. So here let’s say if we want
to write four partitions of data, right over here, we’ll wait for the first
partition’s metadata to be completed and then move
on to the next partition. In the improved version, we
just made that more pipeline. So we use this sort of asynchronous pipeline API in Zookeeper. So if you want to write
multiple partitions, right, we’ll be initiating the write
for the first partition. Before it completes, we
initiate the second one or initiate the third one. And then as those things are happening, some of the things can be
potentially be batched together and then be done in parallel. And in the end, we’ll say okay, we’ll wait for everything to be completed. As you can see because we
are doing this pipelining, the overall latency
can be shortened a lot, especially when you have more partitions. So that’s the first thing we
did to improve the performance. The second thing we did
is when the controller has to communicate its
metadata to other brokers, we also batch the communication. Since they’re sending one
partition in a single request to other brokers the
controller now will send hopefully just a single request including all the partitions. So that also reduced
the amount of requests each broker has to handle, when you have this failover. Okay, so now let’s look at
some of the improvements, where this happens. So the first thing, let’s revisit back to this controlled shutdown process. Remember earlier we have this
bottleneck in step three. That’s when we have to persist
all those metadata, right, during a controlled shutdown to Zookeeper. So after Apache Kafka 1.1
authorize to this pipeline. So this is actually a bit
improvement to this stack. And the communication from the
controller to other brokers that’s also batched, that also improves the second part of the
controlled shutdown. If we look at controlled
controller failover, when the controller fails over, it has to read the this data
from Zookeeper into its memory. And this process is also using that pipeline reads from Zookeeper, who can hide a lot of
latencies in Zookeeper. Now, let’s look at some of the
performance numbers we had. So we did some performance
just to see what big of, how big of an impact we
have with those changes. So this is some setup numbers. We have like a five-node Zookeeper, five-node broker, roughly tens
of thousands of partitions. And for each broker, we have about 10k partitions or replicas. So the first set we did
is to measure the time to do a controlled shutdown of a single broker with this setup. So before 1.1, this process can take more than six minutes because
you have tens of thousands of partitions to move. With this fix we had in 1.1, it takes three seconds. So it’s a big improvement. The second thing we did
is to measure the time to complete the controlled failover. A big part of that is for the controller to finish the loading of those states, right, from from Zookeeper. So again this is some setup, and you remember, in this case, during the failover, the
controller has to load all the partition’s metadata for the whole cluster into its cache. And in this case, it’s about
2k topics times 50 partitions, it’s about 100k worth of partitions it has to load from Zookeeper. So these are some numbers we have. So before we did improvements, this process takes about 28 seconds. With the improvements by
pipelining those reads from Zookeeper, we can cut it by half. So it’s also a big improvement. So next, I’m gonna talk about
some of the improvements we did in terms of fencing
those zombie requests from the controller. So there are a few different flavors of fencing we have to do. The first level is the zombie request can be because the
controller itself was lonely. So I sort of gave you example earlier. So it is possible that a controller become a controller can lose its controller-ship because not because it’s
like actually failed. It could be because it actually had a long G, garbage collection. So that’s one case. In this case, we need to make sure when this controller wakes up from the GC, you can’t do any damage,
right, to the cluster. You can’t send zombie requests. You can’t write any
zombie data to Zookeeper. So that’s actually we
already did in Kafka 1.1 as part of the usage of a
new set of Zookeeper API to allow us do the pipelining. Another thing we did is maybe the controller, the
old controller is live. But you can have some
administrator clumsy, and manually deleted that
controller path in Zookeeper. In this case, that will also trigger the election of new controller. And in that case, the older controller hasn’t even lost its session. It’s just magically,
it’s not the controller because something has changed
under cover beneath it. So in that case, we also want to make sure when as soon as the new
controller is elected, the old controller,
even though it’s alive, you can’t do any damage
to the whole cluster. So this is done by a couple things. One is we add a little bit of logic how the controller persists
its metadata to Zookeeper. So with the change in Kafka 2.2 to be able to for a controller to successfully write
a value to Zookeeper, you have to make sure the
controller epoch you cached in memory matches the controller version the controller epoch that’s
actually written in Zookeeper. So there’s a separate path in Zookeeper, where we register latest
version of the controller, so every time the controller changes, the controller has to
bump up that version. And each of the controller actually keeps that version in memory. So when you try to write to Zookeeper, the Zookeeper have this API for you to this conditional write. You can associate a
condition with the write and then we, and then the
Zookeeper sort of make sure it checks that condition. Only that condition passes,
will the write be allowed. So it makes sense, we
just change logic so that all the writes the
controller will be doing will be conditioned on a
matching controller epoch. So with this, we can
fence older zombie writes from the older controller to Zookeeper. Of course, all the requests
from the controller also carries this controller epoch so that the broker can also
fence the older version of the controller request, as soon as it has seen the new ones. Another type of zombie request, that’s actually getting
a little bit tricky is in that case, the
controller may be okay. The controller is actually the right one. But in the case when you
have restarted a broker, so when the broker is restarted, it could be that the broker
is receiving a request from other controller but that request is based on what controller generated based on the order restart of that broker. So maybe I think, when the
controller was responding to an event for one broker
and is about to send it, and this broker goes down and
the program comes back again. Now this broker is actually
receiving something from the right controller. But it’s actually based
on its previous generation of the restart of a broker. So in some sense, that
can carry information that’s not suitable for this
current restart of the broker. So earlier, I think, we didn’t really have a
way to distinguish that, but we also fix that in Kafka 2.2. So the way we do that is we also added a epoch for the broker. So now, every time a broker is restarted, the broker will be
registered with a new epoch. So then we can distinguish
between two different versions or two different
generations of the broker. And we can decide based on that, we can decide either to take a request from the controller or ignore it. So this way allows us
to be more protective in terms of those zombie requests even from the right controller. Okay, so to summarize. So these are some of the
improvements we have done in some of the recent
versions of Apache Kafka. We made a lot of improvements
both in terms of performance as well as correctness. So now I think, the controller
is in a much better state in terms of scalability
and the consistency and the correctness. And so before I open up for questions, these are the set of
people who have contributed to all those work that
I presented earlier. I think some of them probably
are sitting in this audience. So I just want to give
them like a big plus. And then we can take some questions. (crowd applauding) – So for a bunch priority, have you tried to see for a large number of brokers in your cluster, how the performance
number will be different? Or it is like relevant now or not? – Yeah I think, we haven’t tried this on a large number of brokers. My expectation is the result won’t be, won’t necessarily be changed too much because I think, there are
two things that’s important. When you do the controlled shutdown, what really matters is
the number of partitions that’s assigned to this
shutting-down broker. That’s sort of irrespective
of the size of the cluster in terms of brokers. The second thing is the
controller failover time. That is mostly just proportional to the total number of
partitions in the whole cluster. Now, whether it’s in a
fewer number of brokers or sort of more number of brokers is sort of a secondary. But in general, it is true, the more the number of the
more partitions you have in the whole cluster, typically
you need more brokers. So it is correlated a bit in that way. – Yeah, thank you. – Any other questions? – I had a question. So I’m curious what you think the scalability bottlenecks are to getting to like thousands of brokers or like millions or tens
of millions of partitions? Like where do you think
the bottleneck for that is? – Yeah, that’s a good question. Okay, so in terms you know, scalability, of course I think, longer term, we want to make further improvements to make sure we can scale
up to more partitions at the cluster level. Now, there are cool things to consider. I think one is some of the
just the raw latency, right. I think earlier, I think, I mentioned when you have this controller failover, you still need to wait for
that controller to load up, right, all the metadata in memory before it can work as a controller. So that, I think we improved that. But I think, in the case
you have a hard failure of the controller, you still need to wait for that amount of time, right. That’s in some sense proportional to the number of partitions. So one way we can do to
improve that is, you know, if we can make that part
active as well, right, if you can have another sort of a broker that can be more actively
caching the same metadata, then in the case when you
have a controller failover, you don’t really have
to wait for that latency because it’s warming up already, right. So that can change, can
improve the scalability sort of a lot, from that perspective. Then another aspect is now, how do you sort of communicate
that metadata, right, from the controller to the broker? So today, when a broker is restarted, initially, the broker has no metadata because it has to get everything
from the controller, right. So of course, the more
partitions you have, the more information the
controller has to communicate that to the broker, right. And at some point that just means, you need like a, you need
a large remote request to be able to propagate this metadata. So that’s a second thing
that’s sort of related. So there I think, but
if you can imagine that if we can make that part more incremental, then potentially, we can also scale up the scalability in terms of the absolute number of partitions you
have, you can have per broker. So I think, if we continue
with our current path with some incremental changes, we can potentially scale
up to maybe probably close to million level
partitions, I think. If we make the, make a lot of
the parts more incremental, where we don’t have to
necessarily send the whole thing every time a broker restarted, I think we can probably make
that number even bigger. Any other questions? – [Member of Audience]
Hi, have you considered using some formal methods to verify the algorithm in some way? And proactively find
some kind of edge cases? – Yeah that’s actually a good question. I think, yeah, I think,
Jason from Confluent, he actually gave a
pretty well-received talk at the last Kafka Summit in San Francisco. A big part of that is about you doing using some formal
replication verification of Kafka’s replication model. So a lot of that is based on Lamport’s replication state machine. And we actually did find
quite a few real bugs that we haven’t captured
with all our system tests. So but the model captured that. So we have made a lot of
improvements based on that. And I think, pretty much all of those have been contributed back to AK. So now, I think, at least if
you run the same model check, it actually can stay
there for probably hours. Earlier I think, it was
because this model check can verify the states very quickly. And it can probably check
millions of those states in just within, in just a few seconds. And earlier, I think, we were
probably just hit the issue maybe just a few seconds
when we started running this. Now I think, the model, I think, won’t fail for hours. Any other questions? There’s one there over there. – [Member of Audience]
Is there any plans to use the data from KIP-380 to
facilitate rolling updates in the future for zero-downtime upgrades? – Yeah actually could you
remind me of what’s KIP-380? – [Member of Audience] Just
one slide back, the epoch. – It’s on, oh. – [Member of Audience]
Oh, two slides, yeah. – Okay, to yeah, to do– – [Member of Audience] Rolling
updates, in-place updates, live with no downtime. – No down, well today, if you have, if you have redundancy right, you actually can do
rolling upgrades already with all the controlled shutdown logic. It’s just you, you can
bring any broker down but we’ll make sure the
other replica takes over discerning of the data, right, before you actually shut it down. So this is actually pretty standard rolling upgrade procedure. And I think, you need
that a little bit just to prevent the case
where you are restarting a broker too quickly. So that definitely helps. But other than that, I think the rolling that the, the standard
rolling restart of the cluster should already be helpful in terms of making the cluster highly available. – [Host] Maybe I can add a
few words about the upgrades. So in our Apache Kafka web page, we actually have the Upgrade
section, whenever a new major or minor release was being out, basically. And besides a rolling bounce manner, maybe one, you know,
tricky thing that people have to be aware of is that
whenever there is an internal Cordova change, then you
probably need to consider doing two rolling bounces,
where you have to stick with the old internal protocol and then doing a second rolling bounce, upgrading the new protocol. But generally speaking,
you should not have downtime upgrading your Kafka brokers. – Okay, cool. – [Host] Any other questions? Okay, so let’s thank Jun again. – All right, thanks.
– Thank you. (crowd applauding) Yeah and I want to thank everyone for coming to our first
Confluent Kafka Meetup as well. And of course, in the future,
we are going to host more Kafka Meetups in both our
Palo Alto headquarter, as well as in our San Francisco office, where I can share the cutting edge work that Confluent has been working with the community on Apache Kafka, as well as its ecosystems,
operations and stuff. So we are looking forward to see you again in the future events. Thank you. (crowd applauding)

, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ,

Post navigation

One thought on “Jun Rao, Confluent – Kafka Controller: A Deep Dive | Bay Area Apache Kafka® Meetup

  1. Very helpful video, zombie requests has bothered me for a while. I have determined to upgrade to 2.2 in these days.

Leave a Reply

Your email address will not be published. Required fields are marked *