kpax
a modular & idiomatic Kafka client in Go
21 August 2016
Hǎi-Liàng "Hal" Wáng
Gopher at Appcoach
Hǎi-Liàng "Hal" Wáng
Gopher at Appcoach
About Me
About Appcoach
No resources to build our own messaging system from scratch :-)
Kafka has excellent design and robust implementation:
Disappointed about the unnecessary complexity of
saramasiesta & go_kafka_clientGain better understanding about Kafka by writing a client and know how to troubleshoot when problems occur
From http://kafka.apache.org/protocol.html:
MessageSet => <OffsetMessage>
OffsetMessage => Offset SizedMessage
Offset => int64
SizedMessage => Size CrcMessage
Size => int32
CrcMessage => Crc Message
Crc => uint32
Message => MagicByte Attributes Key Value
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytesBy h12.io/wipro:
func (t *MessageSet) Marshal(w *wipro.Writer) {
offset := len(w.B)
w.WriteInt32(0)
start := len(w.B)
for i := range *t {
(*t)[i].Marshal(w)
}
w.SetInt32(offset, int32(len(w.B)-start))
}
func (t *Message) Marshal(w *wipro.Writer) {
w.WriteInt8(t.MagicByte)
w.WriteInt8(t.Attributes)
w.WriteBytes(t.Key)
w.WriteBytes(t.Value)
}Kafka Wire Protocol:
"It should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling)"
"Clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput"
Modular and reusable sub-packages:
h12.io/kpax/
proto
broker
cluster
producer
consumer
logBroker > Cluster > Producer
Produce Methods:
func (b *Broker) Produce(.......) func (b *Cluster) Produce(.......) func (b *Producer) Produce(.......)
Constructers:
broker.New(cfg *BrokerConfig) cluster.New(cfg *ClusterConfig) producer.New(cfg *ProducerConfig)
Problems of this approach
DIP:
In our case:
type Broker interface {
Do(Request, Response) error
Close()
}
type Cluster interface {
Coordinator(group string) (Broker, error)
CoordinatorIsDown(group string)
Leader(topic string, partition int32) (Broker, error)
LeaderIsDown(topic string, partition int32)
Partitions(topic string) ([]int32, error)
}
type Request interface {
Send(io.Writer) error
ID() int32
SetID(int32)
}
type Response interface {
Receive(io.Reader) error
ID() int32
}Payload.Produce
func (p *Payload) Produce(c model.Cluster) error {
leader, err := c.Leader(p.Topic, p.Partition)
if err != nil {
return err
}
if err := p.DoProduce(leader); err != nil {
if IsNotLeader(err) {
c.LeaderIsDown(p.Topic, p.Partition)
}
return err
}
return nil
}
func (p *Payload) DoProduce(b model.Broker) error {
......
if err := b.Do(&req, &resp); err != nil {
return err
}
......
}type Payload struct {
Topic string
Partition int32
MessageSet MessageSet
RequiredAcks ProduceAckType
AckTimeout time.Duration
}
type Broker struct {
Addr string
Timeout time.Duration
}
type Producer struct {
Cluster model.Cluster
RequiredAcks proto.ProduceAckType
AckTimeout time.Duration
}
NewCluster(newBroker NewBrokerFunc, brokers []string) model.Cluster"New" constructor returns an object with default configuration.
go install h12.io/kpax/cmd/kpax
Sub-commands:
Using h12.io/realtest/kafka:
func TestXXX(t *testing.T) {
k, err := kafka.New()
if err != nil {
t.Fatal(err)
}
partitionCount := 2
topic, err := k.NewRandomTopic(partitionCount)
if err != nil {
t.Fatal(err)
}
defer k.DeleteTopic(topic)
// Your test code goes here
}Manual checking:
Continuous monitoring:
Crash consistency is hard but possible at a huge performance cost (fsync)
To get short response time, perhaps we should either give up persistence,
or we could just give up manual fsync and rely on replication: