Overview

In today’s high-performance and concurrent computing environments, effectively processing a stream of messages using a mix of purely computational functions and remote procedure calls has become a significant challenge. The Go programming language is designed to handle concurrency well, but when it comes to managing a hybrid load, even Go can struggle to achieve optimal CPU utilization. In this article, we will discuss the Hybrid Handler pattern, an efficient and unified approach to address this challenge.

The problem

Assuming we have a stream of incoming messages and we need to process them with a list of handlers, aggregate the results and send the output downstream.

If all the handlers are only purely local computations, then an implementation like below already meets the basic functional and performance requirements.

type Processor struct {
    handlers SyncHandler
}

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := NewOutput()
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message)
        if err != nil {
            return err
        }
        output.Add(result)
    }
    return p.SendOutput(output)
}

e.g. With an 8-core CPU, we can simply initialise 8 Processor objects and run them in 8 goroutines, which can make full use of the hardware easily. (error handling is omitted here)

for i := 0; i < numCPU; i++ {
    go func() {
        processor := NewProcessor()
        for message := range inputChan {
            processor.ProcessMessage(ctx, message)
        }
    }()
}

However, if the processing logic involves a mix of local computations and remote procedure calls, it becomes more difficult to process a high volume of messages in a timely and efficient manner. If the processor still handles messages one by one, it must wait for all handlers to complete before outputting the corresponding result. However, The added I/O roundtrip delay for each message will be so high that the CPU cores stay idle most of the time.

To mitigate this issue, one could attempt to saturate the CPU with brute force, parallelizing the processors with even more goroutines, as in most microservices written in Go, where one goroutine is used to handle one incoming request. While this might work well enough for an IO-bound application or when the traffic is low, it can penalize the performance significantly when all or most of the handlers are pure computations due to the relative costs of channel message passing and goroutine scheduling compared to the computations themselves.

Another feasible but more ad-hoc solution is just to separate purely computational handlers from remote handlers into two groups, and execute them differently, which could potentially remove the penalty on pure computations. However, how to wait for all the results is still a problem and two separate groups add more complexity to both the start and finish of those functions.

What we need here is an abstraction that unifies local and remote handlers without the overhead of goroutines and channels. This would allow purely computational handlers to achieve bare metal speed while remote handlers don’t block unnecessarily.

The Hybrid Handler pattern

type Handler interface {
    Handle(
        ctx          context.Context,
        message      Message,
        returnResult ReturnResultFunc,
    ) error

type ReturnResultFunc func(ctx context.Context, result Result) error

The key to addressing these challenges lies in an interface method that provides a callback parameter returnResult for returning the result, which allows the implementation of the interface to be either a synchronous pure computation or an asynchronous IO operation without introducing much overhead.

Is it just a function with a callback?

Yes, at first glance, it might look not only trivial but also unidiomatic in Go, but looking closely, we will find that it is a simple and natural solution to the problem at hand and makes a lot of sense.

The new processor

With the Handler interface above, the Processor can be re-implemented as below.

On arrival of each result, the output can decrement a counter atomically to determine if it’s ready to be sent downstream. This operation adds much less overhead than the brute force way of adding many goroutines.

type Processor struct {
    handlers Handler
}

type Output struct {
    handlerCount int64
    processor *Processor
}

func (p *Processor) NewOutput(handlerCount int) *Output {
    return &Output{
        HandlerCount: handlerCount,
        processor:    p,
    }
}

func (p *Processor) ProcessMessage(ctx context.Context, message Message) error {
    output := p.NewOutput(len(p.handlers))
    for _, handler := range p.handlers {
        result, err := handler.Handle(ctx, message, output.Add)
        if err != nil {
            return err
        }
    }
    return nil
}

func (o *Output) Add(ctx context.Context, result Result) error
    // add the result to the output
    // ......

    // check if it's the result of the the last handler
	if atomic.AddInt64(&h.row.pendingCategories, -1) > 0 {
		return nil // not the last one
	}

    // is the last one
    return p.processor.SendOutput(o)
}

Synchronous handlers

It is almost trivial to write an adapter from any synchronous handler to the Handler interface above, and the overhead is just one extra interface method call.

type SyncHandler interface {
    Handle(ctx context.Context, message Message) (Result, error)
}

type SyncHandlerAdapter struct {
    syncHandler SyncHandler
}

func (a SyncHandlerAdapter) Handle(
    ctx          context.Context,
    message      Message,
    returnResult ReturnResultFunc,
) error {
    result, err := a.syncHandler.Handle(ctx, message)
    if err != nil {
        return err
    }
    return returnResult(ctx, result)
}

Asynchronous handlers

There are various techniques to wrap a remote procedure call in an asynchronous handler efficiently, e.g. a batch or a streaming API. I do not intend to cover the details here, but regardless of the implementations, the Handler interface allows the Handle method to return immediately without blocking the processing of the next message while a background goroutine can send the result back with the returnResult callback function when the result is ready later.

A hybrid handler is also possible, returning the result immediately for some inputs while doing it asynchronously for others.

We might need to pay more attention to the error handling in the asynchronous handlers. The handler must ensure a result is returned so that the output can decrement the count correctly. So one possible way is to embed the error in the result to be passed back.

Conclusion

In conclusion, the Hybrid Handler pattern allows synchronous and asynchronous handlers to coexist without incurring significant overhead, resulting in improved CPU utilization and reduced complexity. By implementing the Hybrid Handler pattern, developers can optimize performance in high-performance computing systems while maintaining the flexibility to handle various types of workloads.