kpax

a modular & idiomatic Kafka client in Go

21 August 2016

Hǎi-Liàng "Hal" Wáng

Gopher at Appcoach

Introduction

About Me

About Appcoach

Rationale

Why Kafka?

No resources to build our own messaging system from scratch :-)

Kafka has excellent design and robust implementation:

Why yet another Go client for Kafka?

Disappointed about the unnecessary complexity of

Gain better understanding about Kafka by writing a client and know how to troubleshoot when problems occur

Goal

Kafka Wire Protocol

BNF

From http://kafka.apache.org/protocol.html:

MessageSet => <OffsetMessage>
    OffsetMessage => Offset SizedMessage
    Offset => int64
    SizedMessage => Size CrcMessage
    Size => int32
    CrcMessage => Crc Message
    Crc => uint32

Message => MagicByte Attributes Key Value
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Code Generation

By h12.io/wipro:

func (t *MessageSet) Marshal(w *wipro.Writer) {
    offset := len(w.B)
    w.WriteInt32(0)
    start := len(w.B)
    for i := range *t {
        (*t)[i].Marshal(w)
    }
    w.SetInt32(offset, int32(len(w.B)-start))
}

func (t *Message) Marshal(w *wipro.Writer) {
    w.WriteInt8(t.MagicByte)
    w.WriteInt8(t.Attributes)
    w.WriteBytes(t.Key)
    w.WriteBytes(t.Value)
}

Broker

Kafka Wire Protocol:

"It should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling)"

"Clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput"

Wrap It Up

Code Organization

Modular and reusable sub-packages:

h12.io/kpax/
    proto
    broker
    cluster
    producer
    consumer
    log

A Simple & Naive Approach

Broker > Cluster > Producer

Produce Methods:

func (b *Broker) Produce(.......)
func (b *Cluster) Produce(.......)
func (b *Producer) Produce(.......)

Constructers:

broker.New(cfg *BrokerConfig)
cluster.New(cfg *ClusterConfig)
producer.New(cfg *ProducerConfig)

Problems of this approach

Dependency Inversion Principle

DIP:

In our case:

Abstraction

type Broker interface {
    Do(Request, Response) error
    Close()
}

type Cluster interface {
    Coordinator(group string) (Broker, error)
    CoordinatorIsDown(group string)
    Leader(topic string, partition int32) (Broker, error)
    LeaderIsDown(topic string, partition int32)
    Partitions(topic string) ([]int32, error)
}

type Request interface {
    Send(io.Writer) error
    ID() int32
    SetID(int32)
}

type Response interface {
    Receive(io.Reader) error
    ID() int32
}

Details

Payload.Produce

func (p *Payload) Produce(c model.Cluster) error {
    leader, err := c.Leader(p.Topic, p.Partition)
    if err != nil {
        return err
    }
    if err := p.DoProduce(leader); err != nil {
        if IsNotLeader(err) {
            c.LeaderIsDown(p.Topic, p.Partition)
        }
        return err
    }
    return nil
}

func (p *Payload) DoProduce(b model.Broker) error {
    ......
    if err := b.Do(&req, &resp); err != nil {
        return err
    }
    ......
}

Configuration

type Payload struct {
    Topic        string
    Partition    int32
    MessageSet   MessageSet
    RequiredAcks ProduceAckType
    AckTimeout   time.Duration
}

type Broker struct {
    Addr     string
    Timeout  time.Duration
}

type Producer struct {
    Cluster      model.Cluster
    RequiredAcks proto.ProduceAckType
    AckTimeout   time.Duration
}

NewCluster(newBroker NewBrokerFunc, brokers []string) model.Cluster

"New" constructor returns an object with default configuration.

Command Line Tool

go install h12.io/kpax/cmd/kpax

Sub-commands:

Verification

Unit Testing with Docker

Using h12.io/realtest/kafka:

func TestXXX(t *testing.T) {
    k, err := kafka.New()
    if err != nil {
        t.Fatal(err)
    }
    partitionCount := 2
    topic, err := k.NewRandomTopic(partitionCount)
    if err != nil {
        t.Fatal(err)
    }
    defer k.DeleteTopic(topic)

    // Your test code goes here
}

Message Counting

Manual checking:

Continuous monitoring:

Some Thoughts on Messaging System

When a Messaging System could be Overkill?

When a Messaging System INDEED useful?

How to achieve reliable persistence?

Crash consistency is hard but possible at a huge performance cost (fsync)

To get short response time, perhaps we should either give up persistence,

or we could just give up manual fsync and rely on replication:

Thank you

Hǎi-Liàng "Hal" Wáng

Gopher at Appcoach

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)