kpax
a modular & idiomatic Kafka client in Go
21 August 2016
Hǎi-Liàng "Hal" Wáng
Gopher at Appcoach
Hǎi-Liàng "Hal" Wáng
Gopher at Appcoach
About Me
About Appcoach
No resources to build our own messaging system from scratch :-)
Kafka has excellent design and robust implementation:
Disappointed about the unnecessary complexity of
sarama
siesta
& go_kafka_client
Gain better understanding about Kafka by writing a client and know how to troubleshoot when problems occur
From http://kafka.apache.org/protocol.html:
MessageSet => <OffsetMessage> OffsetMessage => Offset SizedMessage Offset => int64 SizedMessage => Size CrcMessage Size => int32 CrcMessage => Crc Message Crc => uint32 Message => MagicByte Attributes Key Value MagicByte => int8 Attributes => int8 Key => bytes Value => bytes
By h12.io/wipro:
func (t *MessageSet) Marshal(w *wipro.Writer) { offset := len(w.B) w.WriteInt32(0) start := len(w.B) for i := range *t { (*t)[i].Marshal(w) } w.SetInt32(offset, int32(len(w.B)-start)) } func (t *Message) Marshal(w *wipro.Writer) { w.WriteInt8(t.MagicByte) w.WriteInt8(t.Attributes) w.WriteBytes(t.Key) w.WriteBytes(t.Value) }
Kafka Wire Protocol:
"It should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling)"
"Clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput"
Modular and reusable sub-packages:
h12.io/kpax/ proto broker cluster producer consumer log
Broker > Cluster > Producer
Produce Methods:
func (b *Broker) Produce(.......) func (b *Cluster) Produce(.......) func (b *Producer) Produce(.......)
Constructers:
broker.New(cfg *BrokerConfig) cluster.New(cfg *ClusterConfig) producer.New(cfg *ProducerConfig)
Problems of this approach
DIP:
In our case:
type Broker interface { Do(Request, Response) error Close() } type Cluster interface { Coordinator(group string) (Broker, error) CoordinatorIsDown(group string) Leader(topic string, partition int32) (Broker, error) LeaderIsDown(topic string, partition int32) Partitions(topic string) ([]int32, error) } type Request interface { Send(io.Writer) error ID() int32 SetID(int32) } type Response interface { Receive(io.Reader) error ID() int32 }
Payload.Produce
func (p *Payload) Produce(c model.Cluster) error { leader, err := c.Leader(p.Topic, p.Partition) if err != nil { return err } if err := p.DoProduce(leader); err != nil { if IsNotLeader(err) { c.LeaderIsDown(p.Topic, p.Partition) } return err } return nil } func (p *Payload) DoProduce(b model.Broker) error { ...... if err := b.Do(&req, &resp); err != nil { return err } ...... }
type Payload struct { Topic string Partition int32 MessageSet MessageSet RequiredAcks ProduceAckType AckTimeout time.Duration } type Broker struct { Addr string Timeout time.Duration } type Producer struct { Cluster model.Cluster RequiredAcks proto.ProduceAckType AckTimeout time.Duration } NewCluster(newBroker NewBrokerFunc, brokers []string) model.Cluster
"New" constructor returns an object with default configuration.
go install h12.io/kpax/cmd/kpax
Sub-commands:
Using h12.io/realtest/kafka:
func TestXXX(t *testing.T) { k, err := kafka.New() if err != nil { t.Fatal(err) } partitionCount := 2 topic, err := k.NewRandomTopic(partitionCount) if err != nil { t.Fatal(err) } defer k.DeleteTopic(topic) // Your test code goes here }
Manual checking:
Continuous monitoring:
Crash consistency is hard but possible at a huge performance cost (fsync)
To get short response time, perhaps we should either give up persistence,
or we could just give up manual fsync and rely on replication: