h12 Stand With Ukraine

Go Pattern: Hybrid Handler

22 April 2023

Overview

In today’s high-performance and concurrent computing environments, effectively processing a stream of messages using a mix of purely computational functions and remote procedure calls has become a significant challenge. The Go programming language is designed to handle concurrency well, but when it comes to managing a hybrid load, even Go can struggle to achieve optimal CPU utilization. In this article, we will discuss the Hybrid Handler pattern, an efficient and unified approach to address this challenge.

The problem

Assuming we have a stream of incoming messages and we need to process them with a list of handlers, aggregate the results and send the output downstream.

If all the handlers are only purely local computations, then an implementation like below already meets the basic functional and performance requirements.

type Processor struct {
    handlers SyncHandler
}

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := NewOutput()
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message)
        if err != nil {
            return err
        }
        output.Add(result)
    }
    return p.SendOutput(output)
}

e.g. With an 8-core CPU, we can simply initialise 8 Processor objects and run them in 8 goroutines, which can make full use of the hardware easily. (error handling is omitted here)

for i := 0; i < numCPU; i++ {
    go func() {
        processor := NewProcessor()
        for message := range inputChan {
            processor.ProcessMessage(ctx, message)
        }
    }()
}

However, if the processing logic involves a mix of local computations and remote procedure calls, it becomes more difficult to process a high volume of messages in a timely and efficient manner. If the processor still handles messages one by one, it must wait for all handlers to complete before outputting the corresponding result. However, The added I/O roundtrip delay for each message will be so high that the CPU cores stay idle most of the time.

To mitigate this issue, one could attempt to saturate the CPU with brute force, parallelizing the processors with even more goroutines, as in most microservices written in Go, where one goroutine is used to handle one incoming request. While this might work well enough for an IO-bound application or when the traffic is low, it can penalize the performance significantly when all or most of the handlers are pure computations due to the relative costs of channel message passing and goroutine scheduling compared to the computations themselves.

Another feasible but more ad-hoc solution is just to separate purely computational handlers from remote handlers into two groups, and execute them differently, which could potentially remove the penalty on pure computations. However, how to wait for all the results is still a problem and two separate groups add more complexity to both the start and finish of those functions.

What we need here is an abstraction that unifies local and remote handlers without the overhead of goroutines and channels. This would allow purely computational handlers to achieve bare metal speed while remote handlers don’t block unnecessarily.

The Hybrid Handler pattern

type Handler interface {
    Handle(
        ctx          context.Context,
        message      Message,
        returnResult ReturnResultFunc,
    ) error

type ReturnResultFunc func(ctx context.Context, result Result) error

The key to addressing these challenges lies in an interface method that provides a callback parameter returnResult for returning the result, which allows the implementation of the interface to be either a synchronous pure computation or an asynchronous IO operation without introducing much overhead.

Is it just a function with a callback?

Yes, at first glance, it might look not only trivial but also unidiomatic in Go, but looking closely, we will find that it is a simple and natural solution to the problem at hand and makes a lot of sense.

The new processor

With the Handler interface above, the Processor can be re-implemented as below.

On arrival of each result, the output can decrement a counter atomically to determine if it’s ready to be sent downstream. This operation adds much less overhead than the brute force way of adding many goroutines.

type Processor struct {
    handlers Handler
}

type Output struct {
    handlerCount int64
    processor *Processor
}

func (p *Processor) NewOutput(handlerCount int) *Output {
    return &Output{
        HandlerCount: handlerCount,
        processor:    p,
    }
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := p.NewOutput(len(p.handlers))
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message, output.Add)
        if err != nil {
            return err
        }
    }
    return nil
}

func (o *Output) Add(ctx context.Context, result Result) error
    // add the result to the output
    // ......

    // check if it's the result of the the last handler
	if atomic.AddInt64(&h.row.pendingCategories, -1) > 0 {
		return nil // not the last one
	}

    // is the last one
    return p.processor.SendOutput(o)
}

Synchronous handlers

It is almost trivial to write an adapter from any synchronous handler to the Handler interface above, and the overhead is just one extra interface method call.

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

type SyncHandlerAdapter struct {
    syncHandler SyncHandler
}

func (a SyncHandlerAdapter) Handle(
    ctx          context.Context,
    message      Message,
    returnResult ReturnResultFunc,
) error {
    result, err := a.syncHandler.Handle(ctx, message)
    if err != nil {
        return err
    }
    return returnResult(ctx, result)
}

Asynchronous handlers

There are various techniques to wrap a remote procedure call in an asynchronous handler efficiently, e.g. a batch or a streaming API. I do not intend to cover the details here, but regardless of the implementations, the Handler interface allows the Handle method to return immediately without blocking the processing of the next message while a background goroutine can send the result back with the returnResult callback function when the result is ready later.

A hybrid handler is also possible, returning the result immediately for some inputs while doing it asynchronously for others.

We might need to pay more attention to the error handling in the asynchronous handlers. The handler must ensure a result is returned so that the output can decrement the count correctly. So one possible way is to embed the error in the result to be passed back.

Conclusion

In conclusion, the Hybrid Handler pattern allows synchronous and asynchronous handlers to coexist without incurring significant overhead, resulting in improved CPU utilization and reduced complexity. By implementing the Hybrid Handler pattern, developers can optimize performance in high-performance computing systems while maintaining the flexibility to handle various types of workloads.

Less Is More: Directed Acyclic Graph

12 May 2022

A directed acyclic graph or DAG is a directed graph with no directed cycles.

An arbitrary directed graph may also be transformed into a DAG, called its condensation, by contracting each of its strongly connected components into a single super vertex.

Go’s import declaration declares a dependency relation between the importing and imported package. It is illegal for a package to import itself, directly or indirectly, or to directly import a package without referring to any of its exported identifiers.

“Allowing cycles enables laziness, poor dependency management, and slow builds” - Rob Pike.

So, definitions strongly connected to each other so much so that they form cycles should be “condensed” into one package, period.

Less is more.

Go Pattern: Runner

22 February 2022

Again and again, a concurrent pattern emerges from the need to control goroutine lifecycles and handle their errors, and I call it the “Runner Pattern”.

The runner interface and its contract

The pattern is as simple as a single-method interface:

// Runner defines the Run method to be executed within a goroutine
type Runner interface {
	Run(ctx context.Context) error
}

The contract of the interface covers two aspects.

On the goroutine lifecycle, the Run method will block until one of the following occurs:

  • it completes successfully and returns nil
  • it fails and returns an error
  • it returns the context error as soon as the context gets cancelled

On the error handling, The contract of a Runner also implies that:

  • all the errors that need to be handled by the caller are returned by the Run method
  • the Run method either can be called concurrently or returns an error if it cannot
  • the Run method does not spawn its own goroutine directly unless there are nested Runners, whose errors need to be returned by the Run method too

A group of runners

To manage multiple runners as a group, it would be straightforward to implement a runner group as below, with some of the sync primitives like WaitGroup and Once (a reference implementation):

type Group interface {
	Go(r Runner)
	Wait() error
}

func NewGroup(ctx context.Context) Group

With this simple group API, we could add multiple runners to a group and wait for all of them to complete. By default, Wait method returns the first error that occurred in any of the runners, or nil if all runners completed successfully.

Rationale

Go’s CSP-style concurrency model enables us writing synchronous code intuitively but under the hood, schedules them off the thread when they block and resumes them when they unblock.

We should make full use of the unique ability of Go, controlling lifecycles and handling errors in an intuitively synchronous way rather than fighting against CSP and writing asynchronous-style code everywhere (e.g. callbacks, future/promise, async/await etc).

With the runner pattern, we abstracts away the boilerplate code of goroutine spawning and error handling, so that each piece of concurrent code can focus on its own business logic.

Is that just an error group?

The error group could be used to implement the runner pattern. In a sense, you could even call it an error group pattern. However, the runner pattern is more about the interface contract rather than the group implementation, and the contract is not only about error handling but also about the goroutine lifecycle.

Implementation tips of a runner

Here are a few tips to make a runner correct and efficient:

  • options, input/output channels and other dependencies can all be arguments passed into the constructor function of a runner
  • a runner object should not contain any state mutable after initialisation, instead, mutable state and other resources should be scoped within the Run method
  • the error handling logic of returning the first error can be overridden by handling a specific type of error within the Run method
  • remember passing the context into wherever it is needed, and monitor if it gets cancelled, especially within loops
  • all the resources allocated within the Run method must be released when it returns

Go Anti-pattern: Parent Closer

8 January 2021

Imagine you need to wrap multiple objects which implements io.Closer, e.g. three clients to fetch and combine data from different endpoints.

type Parent struct {
    child1 Child1
    child2 Child2
    child3 Child3
}

Parent closer

Let’s see how we can create and destroy a parent object.

func NewParent() (*Parent, error) {
    child1, err := NewChild1()
    if err != nil {
        return nil, err
    }
    child2, err := NewChild1()
    if err != nil {
        // oops, child1 needs to be closed here
        child1.Close()
        return nil, err
    }
    child3, err := NewChild1()
    if err != nil {
        // oops again, both child1, and child2 needs to be closed here
        child1.Close()
        child2.Close()
        return nil, err
    }
    return &Parent{
        child1: child1,
        child2: child2,
        child3: child3,
    }, nil
}

func (p *Parent) Close() error {
    var errs []error
    if err := p.child1.Close(); err != nil {
        errs = append(errs, err)
    }
    if err := p.child2.Close(); err != nil {
        errs = append(errs, err)
    }
    if err := p.child3.Close(); err != nil {
        errs = append(errs, err)
    }
    return multierr.Combine(errs...)
}

Note the boilerplate code of closing the children. Because the parent creates its children, it must be responsible for calling their Close method whenever needed. If there are any errors during the initialisation, the children already created have to be properly closed, and before the parent exits its scope, it has to close its children too.

Furthermore, the Closer interface is contagious. If we organise our code by wrapping objects layer by layer like above, and any one of the descendants is a Closer, then all the types in the hierarchy are infected and have to implement the Closer interface too.

Parent container

Unlike the parent closer, all of the complexity could have been avoided if the parent is a simple container, borrowing the references of the children rather than owning them, as long as the children outlive its parent.


func NewParent(child1 Child1, child2 Child2, child3 Child3) *Parent {
    return &Parent{child1: child1, child2: child2, child3: child3}
}

func run() error {
    child1, err := NewChild1()
    if err != nil {
        return nil, err
    }
    defer child1.Close()
    child2, err := NewChild1()
    if err != nil {
        return nil, err
    }
    defer child2.Close()
    child3, err := NewChild1()
    if err != nil {
        return nil, err
    }
    defer child3.Close()

    parent := NewParent(child1, child2, child3)

    // the parent can be used safely here before func run returns
}

It is usually straightforward to guarantee the children outlive its parent in real cases:

  • either the parent is created and held by a service during its whole lifetime, and func run could be a function that keeps running until the service terminates
  • or the parent is created when handling a request, and func run is the request handler

The key difference between a “parent closer” and a “parent container” is that the latter makes it possible to use the defer statements to close the children in either error or normal case, so the duplicated clean-up code can be avoided.

Conclusion

io.Closer interfaces are contagious. Usually, we do not want to wrap them to make another io.Closer, instead, we should only wrap them by reference borrowing, without managing their lifetime within the wrapper.

Probability as a Generalisation of Boolean Algebra

14 December 2020

A summary of Boolean algebra

Given the following notations:

  • a proposition is denoted as a lowercase letter, e.g. $p$, $q$
  • the truth value of a proposition $p$ is denoted as $ B(p) \in \set{0, 1} $, where $B(p)=1$ if $p$ is true or $B(p)=0$ if $p$ is false

Negation (not, $¬$), conjunction (and, $∧$) and disjunction (or, $∨$) are defined by the truth tables below:

$B(p)$ $B(¬p)$
0 1
1 0

$B(p)$ $B(q)$ $B(p∧q)$ $B(p∨q)$
0 0 0 0
0 1 0 1
1 0 0 1
1 1 1 1

Truth value allocation as an intuitive picture of Boolean algebra

Intuitively, we can derive the same truth tables by following the process of the truth value allocation with the two steps below:

  • List all mutually exclusive results from propositions under consideration
  • Allocate the truth value to them so that their truth-values sum up to $1$

Note that the second step actually requires only one of result has a truth value $1$, given $ B(p) \in \set{0, 1} $.

Negation

For a single proposition $p$, there are two mutually exclusive results $p$ (is true) or $¬p$ (is true), as follows:

$¬p$ $p$
$B(¬p)$ $B(p)$

Where we can allocate the truth value:

$$B(¬p) + B(p) = 1$$

Thus we get an equivalent equation to the negation truth table.

Conjunction

Now let’s consider two propositions $p$ and $q$ at the same time. There are four mutually exclusive results: $(p∧q)$, $(¬p∧q)$, $(p∧¬q)$ or $(¬p∧¬q)$. Thus we have the truth value allocation table:

$¬p$ $p$
$¬q$ $B(¬p∧¬q)$ $B(p∧¬q)$
$q$ $B(¬p∧q)$ $B(p∧q)$

Where we can allocate the truth value:

$$ B(¬p∧¬q) + B(p∧¬q) + B(¬p∧q) + B(p∧q) = 1 $$

From the allocation we can easily get $B(p)$ as the sum of the second column:

$$ B(p) = B(p∧q) + B(p∧¬q) $$

And $B(q)$ as the sum of the second row:

$$ B(q) = B(p∧q) + B(¬p∧q) $$

Similar equations can be derived for $B(¬p)$ as the sum of the first column and $B(¬q)$ as the sum of the first row.

If we use the notation $B(q|p)$ as the ratio of truth value “q is true” given “p is true”, the conjunction $B(p∧q)$ can be expressed as:

$$ B(p∧q) = B(p) \times B(q|p) $$

This equation can be read from the allocation table as “the sum of the first column where $p$ is true” times “the ratio of $q$ is true given $p$ is true”.

Also, note that $B(q|p) = B(q)$, which can be easily derived from the allocation table. Thus we have the equivalent equation for the conjunction:

$$ B(p∧q) = B(p) \times B(q) $$

Disjunction

The disjunction $B(p∨q)$ can be calculated by summing up the second column (where $p$ is true) and the second row (where $q$ is true) but subtracting $B(p∧q)$ (which is summed twice):

$$ B(p∨q) = B(p) + B(q) - B(p∧q) $$

From Boolean algebra to probability

The probability can be generalised from the similar notations and intuitive allocation picture, by replacing the truth value with the probability value.

Given the following notations:

  • a proposition is denoted as a lowercase letter, e.g. $p$, $q$
  • the probability of a proposition $p$ is denoted as $ P(p) \in [0, 1] $, where $P(p)$ represents our belief in the truthfulness of the proposition $p$

If we enumerate all possible mutually exclusive outcomes and allocate the probability value $1$ to them. We will get similar equations as the Boolean algebra.

$$ P(p) + P(¬p) = 1 $$

The product (chain) rule:

$$ P(p∧q) = P(p) \times P(q|p) $$

Where $P(q|p)$ is called the conditional probability of $q$ given $p$.

And the sum rule:

$$ P(p∨q) = P(p) + P(q) - P(p∧q) $$

Then all the rest of the probability theory could be derived from the definitions above.

Also note that the notations above are equivalent to the Kolmogorov first and second axioms and the probability allocation is the third axiom, only expressed more intuitively but less formally.

Go Pattern: Context-aware Lock

30 November 2020

We often use Mutex or RWMutex as locks in Go, but sometimes we need a lock that can be cancelled by a context during the lock attempt.

The pattern is simple - we use a channel with length 1:

lockChan := make(chan struct{}, 1)
lockChan <- struct{}{} // lock
<- lockChan            // unlock

When multiple goroutines try to obtain the lock, only one of them is able to fill into the only slot, and the rest are blocked until the slot is empty again after a readout.

Unlike mutexes, we could easily cancel the lock attempt with a context:

select {
    case <-ctx.Done():
        // cancelled
    case lockChan <- struct{}{}:
        // locked
}

Let’s wrap it up:

type CtxMutex struct {
    ch chan struct{}
}

func (mu *CtxMutex) Lock(ctx context.Context) bool {
    select {
        case <-ctx.Done():
            return false
        case mu.ch <- struct{}{}:
            return true
    }
}

func (mu *CtxMutex) Unlock() {
    <- mu.ch
}


func (mu *CtxMutex) Locked() bool {
    return len(mu.ch) > 0 // locked or not
}

Further, context.WithTimeout could be used to apply a timeout to the lock.

Go Pattern: Buffered Writer

22 November 2020

A buffered writer is so ubiquitous that we do not usually consider it as a pattern, but sometimes we reinvent it or even do it in an inferior way. Let us look at a real use case first.

Batch processor

What would you do to improve the throughput of a service? The answer is short: batching.

By processing and sending in a batch of multiple items instead of a single item at a time, you are amortizing the network overhead from the request-response round trip among all the items in the batch.

Then how would you design a client interface to do that batching?

How about this?

type BatchProcessor interface {
    Process(items []Item) error
}

It looks like a straightforward solution, but in reality, it introduces unnecessary complexity in both business logic and error handling.

The processing is often composed of multiple steps working on the items, e.g. transformations and encoding.

items -> transformations -> encoding -> bytes

With a batch processor interface like above, each step has to loop around the items, and each step has to deal with the errors from multiple items. Not only there is more complexity but also less flexibility. What if the client would like to send the rest of the items, even if some of the items return errors? What if the client instead would like to discard the whole batch if any one of them is erroneous?

There must be a better way.

End-to-end principle

“Smart terminals, dumb network”. The end-to-end (e2e) principle, articulated in the field of computer network, basically says any smart features should reside in the communicating end nodes, rather than in intermediary nodes.

In our use case, the smart feature is batching. By e2e, we make sure each step should only process a single item, and only the initial sender and the final receiver knows about the batching.

There are various examples In Go’s standard packages that already do this, e.g. bufio.Writer. The basic idea is an interface similar to below:

type BufferedWriter interface {
    Write(item Item) error
    Flush() error
}

The caller issues multiple writes to make a batch and a flush to mark the end of the batch. The writer chains the transformation and encoding steps of an item in a single write method and returns the error for the item. When the flush method is called, the writer flushes the whole batch and completes the batch.

Stateless vs Stateful

On the surface, BatchProcessor is stateless while BufferedWriter is stateful, but the former only pushes to its caller the responsibility of aggregating a batch, which is a stateful operation. On the other hand, the final step of the processing - the underlying driver regardless it is of file or network IO - is stateful too. So BufferedWriter does not add additional burden to its caller for managing a stateful interface.

Rather, BufferedWriter not only simplifies the chain of processing within it, but also simplifies the batching logic on its caller side.

Concurrency

A BufferedWriter can become concurrently safe by locking both Write and Flush methods. However, the ideal way of calling a BufferedWriter is from a single goroutine so that the caller is able to control exactly what are in the batch, and we can get rid of the overhead of the lock.

If multiple goroutines must share a single underlying writer and at the same time want to control its own batches, then we could return an object instead of flushing, as below:

type Builder interface {
    Write(item Item) error
    Bytes() []byte // return bytes
    Object() Batch // or a batch object
}

In fact, it becomes the Builder Pattern. Each goroutine has its own builder, building its own batches, and then sending those batches to a shared driver.

In addition, we could even have various write methods, each for its own item type.

Transaction

If the caller needs to discard a batch, we could extend it with a rollback method, similar to sql.Tx:

type TxWriter interface {
    Write(item Item) error
    Commit() error
    Rollback() error
}

Then it becomes the Unit of Work Pattern.

Conclusion

Whenever we want to process and send multiple items, consider this Buffered Writer Pattern and its variants and see if it can better suit our needs.