Showing posts with label Low-Latency. Show all posts
Showing posts with label Low-Latency. Show all posts

Monday, 5 May 2014

Simple Binary Encoding

Financial systems communicate by sending and receiving vast numbers of messages in many different formats. When people use terms like "vast" I normally think, "really..how many?" So lets quantify "vast" for the finance industry. Market data feeds from financial exchanges typically can be emitting tens or hundreds of thousands of message per second, and aggregate feeds like OPRA can peak at over 10 million messages per second with volumes growing year-on-year. This presentation gives a good overview.

In this crazy world we still see significant use of ASCII encoded presentations, such as FIX tag value, and some more slightly sane binary encoded presentations like FAST. Some markets even commit the sin of sending out market data as XML! Well I cannot complain too much as they have at times provided me a good income writing ultra fast XML parsers.

Last year the CME, who are a member the FIX community, commissioned Todd Montgomery, of 29West LBM fame, and myself to build the reference implementation of the new FIX Simple Binary Encoding (SBE) standard. SBE is a codec aimed at addressing the efficiency issues in low-latency trading, with a specific focus on market data. The CME, working within the FIX community, have done a great job of coming up with an encoding presentation that can be so efficient. Maybe a suitable atonement for the sins of past FIX tag value implementations. Todd and I worked on the Java and C++ implementation, and later we were helped on the .Net side by the amazing Olivier Deheurles at Adaptive. Working on a cool technical problem with such a team is a dream job.

SBE Overview

SBE is an OSI layer 6 presentation for encoding/decoding messages in binary format to support low-latency applications. Of the many applications I profile with performance issues, message encoding/decoding is often the most significant cost. I've seen many applications that spend significantly more CPU time parsing and transforming XML and JSON than executing business logic. SBE is designed to make this part of a system the most efficient it can be. SBE follows a number of design principles to achieve this goal. By adhering to these design principles sometimes means features available in other codecs will not being offered. For example, many codecs allow strings to be encoded at any field position in a message; SBE only allows variable length fields, such as strings, as fields grouped at the end of a message.

The SBE reference implementation consists of a compiler that takes a message schema as input and then generates language specific stubs. The stubs are used to directly encode and decode messages from buffers. The SBE tool can also generate a binary representation of the schema that can be used for the on-the-fly decoding of messages in a dynamic environment, such as for a log viewer or network sniffer.

The design principles drive the implementation of a codec that ensures messages are streamed through memory without backtracking, copying, or unnecessary allocation. Memory access patterns should not be underestimated in the design of a high-performance application. Low-latency systems in any language especially need to consider all allocation to avoid the resulting issues in reclamation. This applies for both managed runtime and native languages. SBE is totally allocation free in all three language implementations.

The end result of applying these design principles is a codec that has ~16-25 times greater throughput than Google Protocol Buffers (GPB) with very low and predictable latency. This has been observed in micro-benchmarks and real-world application use. A typical market data message can be encoded, or decoded, in ~25ns compared to ~1000ns for the same message with GPB on the same hardware. XML and FIX tag value messages are orders of magnitude slower again.

The sweet spot for SBE is as a codec for structured data that is mostly fixed size fields which are numbers, bitsets, enums, and arrays. While it does work for strings and blobs, many my find some of the restrictions a usability issue. These users would be better off with another codec more suited to string encoding.

Message Structure

A message must be capable of being read or written sequentially to preserve the streaming access design principle, i.e. with no need to backtrack. Some codecs insert location pointers for variable length fields, such as string types, that have to be indirected for access. This indirection comes at a cost of extra instructions plus losing the support of the hardware prefetchers. SBE's design allows for pure sequential access and copy-free native access semantics.

Figure 1
SBE messages have a common header that identifies the type and version of the message body to follow. The header is followed by the root fields of the message which are all fixed length with static offsets. The root fields are very similar to a struct in C. If the message is more complex then one or more repeating groups similar to the root block can follow. Repeating groups can nest other repeating group structures. Finally, variable length strings and blobs come at the end of the message. Fields may also be optional. The XML schema describing the SBE presentation can be found here.

SbeTool and the Compiler

To use SBE it is first necessary to define a schema for your messages. SBE provides a language independent type system supporting integers, floating point numbers, characters, arrays, constants, enums, bitsets, composites, grouped structures that repeat, and variable length strings and blobs.

A message schema can be input into the SbeTool and compiled to produce stubs in a range of languages, or to generate binary metadata suitable for decoding messages on-the-fly.

    java [-Doption=value] -jar sbe.jar <message-declarations-file.xml>

SbeTool and the compiler are written in Java. The tool can currently output stubs in Java, C++, and C#.

Programming with Stubs

A full example of messages defined in a schema with supporting code can be found here. The generated stubs follow a flyweight pattern with instances reused to avoid allocation. The stubs wrap a buffer at an offset and then read it sequentially and natively.
    // Write the message header first
    MESSAGE_HEADER.wrap(directBuffer, bufferOffset, messageTemplateVersion)
                  .blockLength(CAR.sbeBlockLength())
                  .templateId(CAR.sbeTemplateId())
                  .schemaId(CAR.sbeSchemaId())
                  .version(CAR.sbeSchemaVersion());

    // Then write the body of the message
    car.wrapForEncode(directBuffer, bufferOffset)
       .serialNumber(1234)
       .modelYear(2013)
       .available(BooleanType.TRUE)
       .code(Model.A)
       .putVehicleCode(VEHICLE_CODE, srcOffset);
Messages can be written via the generated stubs in a fluent manner. Each field appears as a generated pair of methods to encode and decode.
    // Read the header and lookup the appropriate template to decode
    MESSAGE_HEADER.wrap(directBuffer, bufferOffset, messageTemplateVersion);

    final int templateId = MESSAGE_HEADER.templateId();
    final int actingBlockLength = MESSAGE_HEADER.blockLength();
    final int schemaId = MESSAGE_HEADER.schemaId();
    final int actingVersion = MESSAGE_HEADER.version();

    // Once the template is located then the fields can be decoded.
    car.wrapForDecode(directBuffer, bufferOffset, actingBlockLength, actingVersion);

    final StringBuilder sb = new StringBuilder();
    sb.append("\ncar.templateId=").append(car.sbeTemplateId());
    sb.append("\ncar.schemaId=").append(schemaId);
    sb.append("\ncar.schemaVersion=").append(car.sbeSchemaVersion());
    sb.append("\ncar.serialNumber=").append(car.serialNumber());
    sb.append("\ncar.modelYear=").append(car.modelYear());
    sb.append("\ncar.available=").append(car.available());
    sb.append("\ncar.code=").append(car.code());

The generated code in all languages gives performance similar to casting a C struct over the memory.

On-The-Fly Decoding

The compiler produces an intermediate representation (IR) for the input XML message schema. This IR can be serialised in the SBE binary format to be used for later on-the-fly decoding of messages that have been stored. It is also useful for tools, such as a network sniffer, that will not have been compiled with the stubs. A full example of the IR being used can be found here.

Direct Buffers

SBE, via Agrona, provides an abstraction to Java, with the MutableDirectBuffer class, to work with buffers that are byte[], heap or direct ByteBuffer buffers, and off heap memory addresses returned from Unsafe.allocateMemory(long) or JNI. In low-latency applications, messages are often encoded/decoded in memory mapped files via MappedByteBuffer and thus can be be transferred to a network channel by the kernel thus avoiding user space copies.

C++ and C# have built-in support for direct memory access and do not require such an abstraction as the Java version does. A DirectBuffer abstraction was added for C# to support Endianess and encapsulate the unsafe pointer access.

Message Extension and Versioning

SBE schemas carry a version number that allows for message extension. A message can be extended by adding fields at the end of a block. Fields cannot be removed or reordered for backwards compatibility.

Extension fields must be optional otherwise a newer template reading an older message would not work. Templates carry metadata for min, max, null, timeunit, character encoding, etc., these are accessible via static (class level) methods on the stubs.

Byte Ordering and Alignment

The message schema allows for precise alignment of fields by specifying offsets. Fields are by default encoded in Little Endian form unless otherwise specified in a schema. For maximum performance native encoding with fields on word aligned boundaries should be used. The penalty for accessing non-aligned fields on some processors can be very significant. For alignment one must consider the framing protocol and buffer locations in memory.

Message Protocols

I often see people complain that a codec cannot support a particular presentation in a single message. However this is often possible to address with a protocol of messages. Protocols are a great way to split an interaction into its component parts, these parts are then often composable for many interactions between systems. For example, the IR implementation of schema metadata is more complex than can be supported by the structure of a single message. We encode IR by first sending a template message providing an overview, followed by a stream of messages, each encoding the tokens from the compiler IR. This allows for the design of a very fast OTF decoder which can be implemented as a threaded interpreter with much less branching than the typical switch based state machines.

Protocol design is an area that most developers don't seem to get an opportunity to learn. I feel this is a great loss. The fact that so many developers will call an "encoding" such as ASCII a "protocol" is very telling. The value of protocols is so obvious when one gets to work with a programmer like Todd who has spent his life successfully designing protocols.

Stub Performance

The stubs provide a significant performance advantage over the dynamic OTF decoding. For accessing primitive fields we believe the performance is reaching the limits of what is possible from a general purpose tool. The generated assembly code is very similar to what a compiler will generate for accessing a C struct, even from Java!

Regarding the general performance of the stubs, we have observed that C++ has a very marginal advantage over the Java which we believe is due to runtime inserted Safepoint checks. The C# version lags a little further behind due to its runtime not being as aggressive with inlining methods as the Java runtime. Stubs for all three languages are capable of encoding or decoding typical financial messages in tens of nanoseconds. This effectively makes the encoding and decoding of messages almost free for most applications relative to the rest of the application logic.

Feedback

This is the first version of SBE and we would welcome feedback. The reference implementation is constrained by the FIX community specification. It is possible to influence the specification but please don't expect pull requests to be accepted that significantly go against the specification. Support for Javascript, Python, Erlang, and other languages has been discussed and would be very welcome.


Update: 08-May-2014

Thanks to feedback from Kenton Varda, the creator of GPB, we were able to improve the benchmarks to get the best performance out of GPB. Below are the results for the changes to the Java benchmarks.

The C++ GPB examples on optimisation show approximately a doubling of throughput compared to initial results. It should be noted that you often have to do the opposite in Java with GPB compared to C++ to get performance improvements, such as allocate objects rather than reuse them.

Before GPB Optimisation:
Mode Thr    Cnt  Sec         Mean   Mean error    Units
     [exec] u.c.r.protobuf.CarBenchmark.testDecode           thrpt   1     30    1      462.817        6.474   ops/ms
     [exec] u.c.r.protobuf.CarBenchmark.testEncode           thrpt   1     30    1      326.018        2.972   ops/ms
     [exec] u.c.r.protobuf.MarketDataBenchmark.testDecode    thrpt   1     30    1     1148.050       17.194   ops/ms
     [exec] u.c.r.protobuf.MarketDataBenchmark.testEncode    thrpt   1     30    1     1242.252       12.248   ops/ms

     [exec] u.c.r.sbe.CarBenchmark.testDecode                thrpt   1     30    1    10436.476      102.114   ops/ms
     [exec] u.c.r.sbe.CarBenchmark.testEncode                thrpt   1     30    1    11657.190       65.168   ops/ms
     [exec] u.c.r.sbe.MarketDataBenchmark.testDecode         thrpt   1     30    1    34078.646      261.775   ops/ms
     [exec] u.c.r.sbe.MarketDataBenchmark.testEncode         thrpt   1     30    1    29193.600      443.638   ops/ms
After GPB Optimisation:
Mode Thr    Cnt  Sec         Mean   Mean error    Units
     [exec] u.c.r.protobuf.CarBenchmark.testDecode           thrpt   1     30    1      619.467        4.429   ops/ms
     [exec] u.c.r.protobuf.CarBenchmark.testEncode           thrpt   1     30    1      433.711       10.364   ops/ms
     [exec] u.c.r.protobuf.MarketDataBenchmark.testDecode    thrpt   1     30    1     2088.998       60.619   ops/ms
     [exec] u.c.r.protobuf.MarketDataBenchmark.testEncode    thrpt   1     30    1     1316.123       19.816   ops/ms


Throughput msg/ms - Before GPB Optimisation
TestProtocol BuffersSBERatio
Car Encode462.817
10436.476
22.52
Car Decode326.018
11657.190
35.76
Market Data Encode1148.050
34078.646
29.68
Market Data Decode1242.252
29193.600
23.50

Throughput msg/ms - After GPB Optimisation
TestProtocol BuffersSBERatio
Car Encode619.467
10436.476
16.85
Car Decode433.711
11657.190
26.88
Market Data Encode2088.998
34078.646
16.31
Market Data Decode1316.123
29193.600
22.18

Thursday, 5 July 2012

Native C/C++ Like Performance For Java Object Serialisation

Do you ever wish you could turn a Java object into a stream of bytes as fast as it can be done in a native language like C++?  If you use standard Java Serialization you could be disappointed with the performance.  Java Serialization was designed for a very different purpose than serialising objects as quickly and compactly as possible.

Why do we need fast and compact serialisation?  Many of our systems are distributed and we need to communicate by passing state between processes efficiently.  This state lives inside our objects.  I've profiled many systems and often a large part of the cost is the serialisation of this state to-and-from byte buffers.  I've seen a significant range of protocols and mechanisms used to achieve this.  At one end of the spectrum are the easy to use but inefficient protocols likes Java Serialisation, XML and JSON.  At the other end of this spectrum are the binary protocols that can be very fast and efficient but they require a deeper understanding and skill.

In this article I will illustrate the performance gains that are possible when using simple binary protocols and introduce a little known technique available in Java to achieve similar performance to what is possible with native languages like C or C++.

The three approaches to be compared are:
  1. Java Serialization: The standard method in Java of having an object implement Serializable.
  2. Binary via ByteBuffer: A simple protocol using the ByteBuffer API to write the fields of an object in binary format.  This is our baseline for what is considered a good binary encoding approach.
  3. Binary via Unsafe: Introduction to Unsafe and its collection of methods that allow direct memory manipulation.  Here I will show how to get similar performance to C/C++.
The Code
import sun.misc.Unsafe;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;

public final class TestSerialisationPerf
{
    public static final int REPETITIONS = 1 * 1000 * 1000;

    private static ObjectToBeSerialised ITEM =
        new ObjectToBeSerialised(
            1010L, true, 777, 99,
            new double[]{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
            new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});


    public static void main(final String[] arg) throws Exception
    {
        for (final PerformanceTestCase testCase : testCases)
        {
            for (int i = 0; i < 5; i++)
            {
                testCase.performTest();

                System.out.format("%d %s\twrite=%,dns read=%,dns total=%,dns\n",
                                  i,
                                  testCase.getName(),
                                  testCase.getWriteTimeNanos(),
                                  testCase.getReadTimeNanos(),
                                  testCase.getWriteTimeNanos() + 
                                  testCase.getReadTimeNanos());

                if (!ITEM.equals(testCase.getTestOutput()))
                {
                    throw new IllegalStateException("Objects do not match");
                }

                System.gc();
                Thread.sleep(3000);
            }
        }
    }

    private static final PerformanceTestCase[] testCases =
    {
        new PerformanceTestCase("Serialisation", REPETITIONS, ITEM)
        {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    baos.reset();

                    ObjectOutputStream oos = new ObjectOutputStream(baos);
                    oos.writeObject(item);
                    oos.close();
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    ByteArrayInputStream bais = 
                        new ByteArrayInputStream(baos.toByteArray());
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    object = (ObjectToBeSerialised)ois.readObject();
                }

                return object;
            }
        },

        new PerformanceTestCase("ByteBuffer", REPETITIONS, ITEM)
        {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.clear();
                    item.write(byteBuffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.flip();
                    object = ObjectToBeSerialised.read(byteBuffer);
                }

                return object;
            }
        },

        new PerformanceTestCase("UnsafeMemory", REPETITIONS, ITEM)
        {
            UnsafeMemory buffer = new UnsafeMemory(new byte[1024]);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    item.write(buffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    object = ObjectToBeSerialised.read(buffer);
                }

                return object;
            }
        },
    };
}

abstract class PerformanceTestCase
{
    private final String name;
    private final int repetitions;
    private final ObjectToBeSerialised testInput;
    private ObjectToBeSerialised testOutput;
    private long writeTimeNanos;
    private long readTimeNanos;

    public PerformanceTestCase(final String name, final int repetitions,
                               final ObjectToBeSerialised testInput)
    {
        this.name = name;
        this.repetitions = repetitions;
        this.testInput = testInput;
    }

    public String getName()
    {
        return name;
    }

    public ObjectToBeSerialised getTestOutput()
    {
        return testOutput;
    }

    public long getWriteTimeNanos()
    {
        return writeTimeNanos;
    }

    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    public void performTest() throws Exception
    {
        final long startWriteNanos = System.nanoTime();
        testWrite(testInput);
        writeTimeNanos = (System.nanoTime() - startWriteNanos) / repetitions;

        final long startReadNanos = System.nanoTime();
        testOutput = testRead();
        readTimeNanos = (System.nanoTime() - startReadNanos) / repetitions;
    }

    public abstract void testWrite(ObjectToBeSerialised item) throws Exception;
    public abstract ObjectToBeSerialised testRead() throws Exception;
}

class ObjectToBeSerialised implements Serializable
{
    private static final long serialVersionUID = 10275539472837495L;

    private final long sourceId;
    private final boolean special;
    private final int orderCode;
    private final int priority;
    private final double[] prices;
    private final long[] quantities;

    public ObjectToBeSerialised(final long sourceId, final boolean special,
                                final int orderCode, final int priority,
                                final double[] prices, final long[] quantities)
    {
        this.sourceId = sourceId;
        this.special = special;
        this.orderCode = orderCode;
        this.priority = priority;
        this.prices = prices;
        this.quantities = quantities;
    }

    public void write(final ByteBuffer byteBuffer)
    {
        byteBuffer.putLong(sourceId);
        byteBuffer.put((byte)(special ? 1 : 0));
        byteBuffer.putInt(orderCode);
        byteBuffer.putInt(priority);

        byteBuffer.putInt(prices.length);
        for (final double price : prices)
        {
            byteBuffer.putDouble(price);
        }

        byteBuffer.putInt(quantities.length);
        for (final long quantity : quantities)
        {
            byteBuffer.putLong(quantity);
        }
    }

    public static ObjectToBeSerialised read(final ByteBuffer byteBuffer)
    {
        final long sourceId = byteBuffer.getLong();
        final boolean special = 0 != byteBuffer.get();
        final int orderCode = byteBuffer.getInt();
        final int priority = byteBuffer.getInt();

        final int pricesSize = byteBuffer.getInt();
        final double[] prices = new double[pricesSize];
        for (int i = 0; i < pricesSize; i++)
        {
            prices[i] = byteBuffer.getDouble();
        }

        final int quantitiesSize = byteBuffer.getInt();
        final long[] quantities = new long[quantitiesSize];
        for (int i = 0; i < quantitiesSize; i++)
        {
            quantities[i] = byteBuffer.getLong();
        }

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    public void write(final UnsafeMemory buffer)
    {
        buffer.putLong(sourceId);
        buffer.putBoolean(special);
        buffer.putInt(orderCode);
        buffer.putInt(priority);
        buffer.putDoubleArray(prices);
        buffer.putLongArray(quantities);
    }

    public static ObjectToBeSerialised read(final UnsafeMemory buffer)
    {
        final long sourceId = buffer.getLong();
        final boolean special = buffer.getBoolean();
        final int orderCode = buffer.getInt();
        final int priority = buffer.getInt();
        final double[] prices = buffer.getDoubleArray();
        final long[] quantities = buffer.getLongArray();

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    @Override
    public boolean equals(final Object o)
    {
        if (this == o)
        {
            return true;
        }
        if (o == null || getClass() != o.getClass())
        {
            return false;
        }

        final ObjectToBeSerialised that = (ObjectToBeSerialised)o;

        if (orderCode != that.orderCode)
        {
            return false;
        }
        if (priority != that.priority)
        {
            return false;
        }
        if (sourceId != that.sourceId)
        {
            return false;
        }
        if (special != that.special)
        {
            return false;
        }
        if (!Arrays.equals(prices, that.prices))
        {
            return false;
        }
        if (!Arrays.equals(quantities, that.quantities))
        {
            return false;
        }

        return true;
    }
}

class UnsafeMemory
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final long byteArrayOffset = unsafe.arrayBaseOffset(byte[].class);
    private static final long longArrayOffset = unsafe.arrayBaseOffset(long[].class);
    private static final long doubleArrayOffset = unsafe.arrayBaseOffset(double[].class);

    private static final int SIZE_OF_BOOLEAN = 1;
    private static final int SIZE_OF_INT = 4;
    private static final int SIZE_OF_LONG = 8;

    private int pos = 0;
    private final byte[] buffer;

    public UnsafeMemory(final byte[] buffer)
    {
        if (null == buffer)
        {
            throw new NullPointerException("buffer cannot be null");
        }

        this.buffer = buffer;
    }

    public void reset()
    {
        this.pos = 0;
    }

    public void putBoolean(final boolean value)
    {
        unsafe.putBoolean(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_BOOLEAN;
    }

    public boolean getBoolean()
    {
        boolean value = unsafe.getBoolean(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_BOOLEAN;

        return value;
    }

    public void putInt(final int value)
    {
        unsafe.putInt(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_INT;
    }

    public int getInt()
    {
        int value = unsafe.getInt(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_INT;

        return value;
    }

    public void putLong(final long value)
    {
        unsafe.putLong(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_LONG;
    }

    public long getLong()
    {
        long value = unsafe.getLong(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_LONG;

        return value;
    }

    public void putLongArray(final long[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, longArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public long[] getLongArray()
    {
        int arraySize = getInt();
        long[] values = new long[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, longArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }

    public void putDoubleArray(final double[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, doubleArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public double[] getDoubleArray()
    {
        int arraySize = getInt();
        double[] values = new double[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, doubleArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }
}

Results
2.8GHz Nehalem - Java 1.7.0_04
==============================
0 Serialisation  write=2,517ns read=11,570ns total=14,087ns
1 Serialisation  write=2,198ns read=11,122ns total=13,320ns
2 Serialisation  write=2,190ns read=11,011ns total=13,201ns
3 Serialisation  write=2,221ns read=10,972ns total=13,193ns
4 Serialisation  write=2,187ns read=10,817ns total=13,004ns
0 ByteBuffer     write=264ns   read=273ns    total=537ns
1 ByteBuffer     write=248ns   read=243ns    total=491ns
2 ByteBuffer     write=262ns   read=243ns    total=505ns
3 ByteBuffer     write=300ns   read=240ns    total=540ns
4 ByteBuffer     write=247ns   read=243ns    total=490ns
0 UnsafeMemory   write=99ns    read=84ns     total=183ns
1 UnsafeMemory   write=53ns    read=82ns     total=135ns
2 UnsafeMemory   write=63ns    read=66ns     total=129ns
3 UnsafeMemory   write=46ns    read=63ns     total=109ns
4 UnsafeMemory   write=48ns    read=58ns     total=106ns

2.4GHz Sandy Bridge - Java 1.7.0_04
===================================
0 Serialisation  write=1,940ns read=9,006ns total=10,946ns
1 Serialisation  write=1,674ns read=8,567ns total=10,241ns
2 Serialisation  write=1,666ns read=8,680ns total=10,346ns
3 Serialisation  write=1,666ns read=8,623ns total=10,289ns
4 Serialisation  write=1,715ns read=8,586ns total=10,301ns
0 ByteBuffer     write=199ns   read=198ns   total=397ns
1 ByteBuffer     write=176ns   read=178ns   total=354ns
2 ByteBuffer     write=174ns   read=174ns   total=348ns
3 ByteBuffer     write=172ns   read=183ns   total=355ns
4 ByteBuffer     write=174ns   read=180ns   total=354ns
0 UnsafeMemory   write=38ns    read=75ns    total=113ns
1 UnsafeMemory   write=26ns    read=52ns    total=78ns
2 UnsafeMemory   write=26ns    read=51ns    total=77ns
3 UnsafeMemory   write=25ns    read=51ns    total=76ns
4 UnsafeMemory   write=27ns    read=50ns    total=77ns

Analysis

To write and read back a single relatively small object on my fast 2.4 GHz Sandy Bridge laptop can take ~10,000ns using Java Serialization, whereas when using Unsafe this can come down to well less than 100ns even accounting for the test code itself.  To put this in context, when using Java Serialization the costs are on par with a network hop!  Now that would be very costly if your transport is a fast IPC mechanism on the same system.

There are numerous reasons why Java Serialisation is so costly.  For example it writes out the fully qualified class and field names for each object plus version information.  Also ObjectOutputStream keeps a collection of all written objects so they can be conflated when close() is called.   Java Serialisation requires 340 bytes for this example object, yet we only require 185 bytes for the binary versions.  Details for the Java Serialization format can be found here.  If I had not used arrays for the majority of data, then the serialised object would have been significantly larger with Java Serialization because of the field names.  In my experience text based protocols like XML and JSON can be even less efficient than Java Serialization.  Also be aware that Java Serialization is the standard mechanism employed for RMI.

The real issue is the number of instructions to be executed.  The Unsafe method wins by a significant margin because in Hotspot, and many other JVMs, the optimiser treats these operations as intrinsics and replaces the call with assembly instructions to perform the memory manipulation.  For primitive types this results in a single x86 MOV instruction which can often happen in a single cycle.  The details can be seen by having Hotspot output the optimised code as I described in a previous article.

Now it has to be said that "with great power comes great responsibility" and if you use Unsafe it is effectively the same as programming in C, and with that can come memory access violations when you get offsets wrong.

Adding Some Context

"What about the likes of Google Protocol Buffers?", I hear you cry out.  These are very useful libraries and can often offer better performance and more flexibility than Java Serialisation.  However they are not remotely close to the performance of using Unsafe like I have shown here.  Protocol Buffers solve a different problem and provide nice self-describing messages which work well across languages.  Please test with different protocols and serialisation techniques to compare results.

Also the astute among you will be asking, "What about Endianness (byte-ordering) of the integers written?"  With Unsafe the bytes are written in native order.  This is great for IPC and between systems of the same type.  When systems use differing formats then conversion will be necessary.

How do we deal with multiple versions of a class or determining what class an object belongs to?  I want to keep this article focused but let's say a simple integer to indicate the implementation class is all that is required for a header.  This integer can be used to look up the appropriately implementation for the de-serialisation operation.

An argument I often hear against binary protocols, and for text protocols, is what about being human readable and debugging?  There is an easy solution to this.  Develop a tool for reading the binary format!

Conclusion

In conclusion it is possible to achieve the same native C/C++ like levels of performance in Java for serialising an object to-and-from a byte stream by effectively using the same techniques.  The UnsafeMemory class, for which I've provided a skeleton implementation, could easily be expanded to encapsulate this behaviour and thus protect oneself from many of the potential issues when dealing with such a sharp tool.

Now for the burning question.  Would it not be so much better if Java offered an alternative Marshallable interface to Serializable by offering natively what I've effectively done with Unsafe???

Saturday, 19 May 2012

Applying Back Pressure When Overloaded

How should a system respond when under sustained load?  Should it keep accepting requests until its response times follow the deadly hockey stick, followed by a crash?  All too often this is what happens unless a system is designed to cope with the case of more requests arriving than it is capable of processing.  If we are seeing a sustained arrival rate of requests, greater than our system is capable of processing, then something has to give.  Having the entire system degrade is not the ideal service we want to give our customers.  A better approach would be to process transactions at our systems maximum possible throughput rate, while maintaining a good response time, and rejecting requests above this arrival rate.

Let’s consider a small art gallery as an metaphor.  In this gallery the typical viewer spends on average 20 minutes browsing, and the gallery can hold a maximum of 30 viewers.  If more than 30 viewers occupy the gallery at the same time then customers become unhappy because they cannot have a clear view of the paintings.  If this happens they are unlikely to purchase or return.  To keep our viewers happy it is better to recommend that some viewers visit the café a few doors down and come back when the gallery is less busy.  This way the viewers in the gallery get to see all the paintings without other viewers in the way, and in the meantime those we cannot accommodate enjoy a coffee.  If we apply Little’s Law we cannot have customers arriving at more than 90 per hour, otherwise the maximum capacity is exceeded.  If between 9:00-10:00 they are arriving at 100 per hour, then I’m sure the café down the road will appreciate the extra 10 customers.

Within our systems the available capacity is generally a function of the size of our thread pools and time to process individual transactions.  These thread pools are usually fronted by queues to handle bursts of traffic above our maximum arrival rate.  If the queues are unbounded, and we have a sustained arrival rate above the maximum capacity, then the queues will grow unchecked.  As the queues grow they increasingly add latency beyond acceptable response times, and eventually they will consume all memory causing our systems to fail.  Would it not be better to send the overflow of requests to the café while still serving everyone else at the maximum possible rate?  We can do this by designing our systems to apply “Back Pressure”.

Figure 1.

Separation of concerns encourages good systems design at all levels.  I like to layer a design so that the gateways to third parties are separated from the main transaction services.  This can be achieved by having gateways responsible for protocol translation and border security only.  A typical gateway could be a web container running Servlets.  Gateways accept customer requests, apply appropriate security, and translate the channel protocols for forwarding to the transaction service hosting the domain model.  The transaction service may use a durable store if transactions need to be preserved.  For example, the state of a chat server domain model may not require preservation, whereas a model for financial transactions must be kept for many years for compliance and business reasons.

Figure 1. above is a simplified view of the typical request flow in many systems.  Pools of threads in a gateway accept user requests and forward them to a transaction service.  Let’s assume we have asynchronous transaction services fronted by an input and output queues, or similar FIFO structures.  If we want the system to meet a response time quality-of-service (QoS) guarantee, then we need to consider the three following variables:
  1. The time taken for individual transactions on a thread
  2. The number of threads in a pool that can execute transactions in parallel
  3. The length of the input queue to set the maximum acceptable latency
    max latency = (transaction time / number of threads) * queue length
    queue length = max latency / (transaction time / number of threads)

By allowing the queue to be unbounded the latency will continue to increase.  So if we want to set a maximum response time then we need to limit the queue length.

By bounding the input queue we block the thread receiving network packets which will apply back pressure up stream.  If the network protocol is TCP, similar back pressure is applied via the filling of network buffers, on the sender.  This process can repeat all the way back via the gateway to the customer.  For each service we need to configure the queues so that they do their part in achieving the required quality-of-service for the end-to-end customer experience.

One of the biggest wins I often find is to improve the time taken to process individual transaction latency.  This helps in the best and worst case scenarios.

Worst Case Scenario

Let’s say the queue is unbounded and the system is under sustained heavy load.  Things can begin to go wrong very quickly in subtle ways before memory is exhausted.  What do you think will happen when the queue is larger than the processor cache?  The consumer threads will be suffering cache misses just at the time when they are struggling to keep up, thus compounding the problem.  This can cause a system to get into trouble very quickly and eventually crash.  Under Linux this is particularly nasty because malloc, or one of its friends, will succeed because Linux allows “Over Commit” by default, then later at the point of using that memory, the OOM Killer will start shooting processes. When the OS starts shooting processes, you just know things are not going to end well!

What About Synchronous Designs?

You may say that with synchronous designs there are no queues.  Well not such obvious ones.  If you have a thread pool then it will have a lock, or semaphore, wait queues to assign threads.  If you are crazy enough to allocate a new thread on every request, then once you are over the huge cost of thread creation, your thread is in the run queue for a processor to execute.  Also, these queues involve context switches and condition variables which greatly increase the costs.  You just cannot run away from queues, they are everywhere!  Best to embrace them and design for the quality-of-service your system needs to deliver to its customers.  If we must have queues, then design for them, and maybe choose some nice lock-free ones with great performance.

When we need to support synchronous protocols like REST then use back pressure, signalled by our full incoming queue at the gateway, to send a meaningful “server busy” message such as the HTTP 503 status code.  The customer can then interpret this as time for a coffee and cake at the café down the road.

Subtleties To Watch Out For...

You need to consider the whole end-to-end service.  What if a client is very slow at consuming data from your system?  It could tie up a thread in the gateway taking it out of action.  Now you have less threads working the queue so the response time will be increasing.  Queues and threads need to be monitored, and appropriate action needs to be taken when thresholds are crossed.  For example, when a queue is 70% full, maybe an alert should be raised so an investigation can take place?  Also, transaction times need to be sampled to ensure they are in the expected range.

Summary

If we do not consider how our systems will behave when under heavy load then they will most likely seriously degrade at best, and at worst crash.  When they crash this way, we get to find out if there are any really evil data corruption bugs lurking in those dark places.  Applying back pressure is one effective technique for coping with sustained high-load, such that maximum throughput can be delivered without degrading system performance for the already accepted requests and transactions.

Thursday, 22 March 2012

Fun with my-Channels Nirvana and Azul Zing

Since leaving LMAX I have been neglecting my blog a bit.  This is not because I have not been doing anything interesting.  Quite the opposite really, things have been so busy the blog has taken a back seat.  I’ve been consulting for a number of hedge funds and product companies, most of which are super secretive.

One company I have been spending quite a bit of time with is my-Channels, a messaging provider.  They are really cool and have given me their blessing to blog about some of the interesting things I’ve been working on for them.

For context, my-Channels are a messaging provider that specialise in delivering data to every device known to man over dodgy networks such as the Internet or your corporate WAN.  They can deliver live financial market data to your desktop, laptop at home, or your iPhone, at the fastest possible rates.  Lately, they have made the strategic move to enter the low-latency messaging space for the enterprise, and as part of this they have enlisted my services.  They want to go low-latency without giving up the rich functionality their product offers which is giving me some interesting challenges.

Just how bad is the latency of such a product when new to the low-latency space?  I did not have high expectations because to be fair this was never their goal.  After some initial tests, I’m thinking these guys are not in bad shape.  They beat the crap out of most JMS implementations and it is going to be fun pushing them to the serious end of the low-latency space. 

OK enough of the basic tests, now it is time to get serious.  I worked with them to create appropriate load tests and get the profilers running.  No big surprises here, when we piled on the pressure, lock-contention came out as the biggest culprit limiting both latency and throughput.  As we go down the list, lots of other interesting things showed up but let’s follow good discipline and start at the top of the list.

Good discipline for “Theory of Constraints” states that you always work on the most limiting factor because when it is removed the list below it can change radically as new pressures are applied.  So to address this contention issue we developed a new lock-free Executor to replace the standard Java implementation.  Tests showed this new executor is ~10X better than what the JDK has to offer.  We integrated the new Executor into the code base and now the throughput bottleneck has been massively changed.  The system can now cope with 16X more throughput, and the latency histogram has become much more compressed.  This is a good example of how macro-benchmarking is so much more valuable than micro-benchmarking.  Not a bad start we are all thinking.

Enter Azul Stage Left

We tested on all the major JVMs and the most predictable latency was achieved with Azul Zing.  Zing had by far the best latency profile with virtually no long tail.  For many of the tests it also had the greatest throughput.

After the lock contention on the Executor issue had been resolved, the next big bottleneck when load testing on the same machine was being limited by using TCP between processes over the loopback adapter.  We discussed developing a new transport that was not network based for Nirvana.  For this we decided to apply a number of the techniques I teach on my lock-free concurrency course.  This resulted in a new IPC transport based on shared memory via memory-mapped files in Java.  We did inter-server testing using 10GigE networks, and had a fun using the new Solarflare network adapters with OpenOnload, but for this article I’ll stick with the Java story.  I think Paul is still sore from me stuffing his little Draytek ADSL router with huge amounts of multicast traffic when the poor thing was connected to our 10GigE test LAN.  Sorry Paul!

Developing the IPC transport unearthed a number of challenges with various JVM implementations of MappedByteBuffer.  After some very useful chats with Cliff Click and Doug Lea we came up with a solution that worked across all JVMs.   This solution has a mean latency of ~100ns on the best JVMs and can do ~12-22 million messages per second throughput for 60-byte messages depending on the JVM.  This was the first time we had found a test whereby Azul was not close to being the fastest.   I isolated a test case and sent it to them on a Friday.  On Sunday evening I got an email from Gil Tene saying he had identified the issue and by Tuesday Cliff Click had a fix that we tried the next week.  When we tested the new Azul JVM, we seen over 40 million messages per second at latencies just over 100ns for our new IPC transport.  I had been teasing Azul that this must be possible in Java because I’d created similar algorithms in C and assembler that show what the x86_64 platform is capable of.

I’m starting to ramble but we had great fun removing latency through many parts of the stack.  When I get more time I will blog about some of the other findings.  The current position is still a work in progress with daily progress on an amazing scale.  The guys at my-Channels are very conservative and do not want to publish actual figures until they have version 7.0 of Nirvana ready for GA, and have done more comprehensive testing.  For now they are happy with me being open about the following:
  • Throughput increased 32X due to the implementation of lock-free techniques and optimising the call stack for message handling to remove any shared dependencies.
  • Average latency decreased 20X from applying the same techniques and we have identified many more possible improvements.
  • We know the raw transport for IPC is now ~100ns and the worst case pause due to GC is 80µs with Azul Zing.  As to the latency for the double hop between a producer and consumer over IPC, via their broker, I’ll leave to your imagination as somewhere between those figures until the guys are willing to make an official announcement.  As you can guess it is much much less than 80µs.
For me the big surprise was GC pauses only taking 80µs in the worst case.  OS scheduling alone I have seen result in more jitter.  I discussed this at length with Gil Tene from Azul, and even he was surprised.  He expects some worst case scenarios with their JVM to be 1-2ms for a well behaved application.  We then explored the my-Channels setup, and it turns out we have done everything almost perfectly to get the best out of a JVM which is worth sharing.
  1. Do not use locks in the main transaction flow because they cause context switches, and therefore latency and unpredictable jitter.
  2. Never have more threads that need to run than you have cores available.
  3. Set affinity of threads to cores, or at least sockets, to avoid cache pollution by avoiding migration.  This is particularly important when on a server class machine having multiple sockets because of the NUMA effect.
  4. Ensure uncontested access to any resource respecting the Single Writer Principle so that the likes of biased locking can be your friend.
  5. Keep call stacks reasonably small.  Still more work to do here.  If you are crazy enough to use Spring, then check out your call stacks to see what I mean!  The garbage collector has to walk them finding reachable objects.
  6. Do not use finalizers.
  7. Keep garbage generation to modest levels.  This applies to most JVMs but is likely not an issue for Zing.
  8. Ensure no disk IO on the main flow.
  9. Do a proper warm-up before beginning to measure.
  10. Do all the appropriate OS tunings for low-latency systems that are way beyond this blog.  For example turn off C-States power management in the BIOS and watch out for RHEL 6 as it turns it back on without telling you!
It should be noted that we ran this on some state of the art Intel CPUs with very large L3 caches.  It is possible to get 20-30MB L3 caches on a single socket these days.  It is very likely that our entire application was running out of L3 cache with the exception of the message flow which is very predictable.

Gil has added a cautionary note that while these results are very impressive we had a team focused on this issue with the appropriate skills to get the best out of the application.  It is not the usual case for every client to apply this level of focus.

What I’ve taken from this experience is the amazing things that can be achieved by truly agile companies, staffed by talented individuals, who are empowered to make things happen.  I love agile development but it has become a religion to some people who are more interested in following the “true” process than doing what is truly needed.  Both my-Channels and Azul have shown during this engagement what is possible in making s*#t happen.  It has been an absolute blast working with individuals who can assimilate information and ideas so fast, then turn them into working software.  For this I will embarrass Matt Buckton at my-Channels, and Gil Tene & Cliff Click at Azul who never failed in rising to a challenge.  So few organisations could have made so much progress over such a short time period.  If you think Java cannot cut it in the high performance space, then deal with one of these two companies, and you will be thinking again.  I bet a few months ago Matt never thought he’d be sitting in Singapore airport writing his first multi-producer lock-free queue when travelling home, and really enjoying it.

Tuesday, 22 November 2011

Biased Locking, OSR, and Benchmarking Fun

After my last post on Java Lock Implementations, I got a lot of good feedback about my results and micro-benchmark design approach.  As a result I now understand JVM warmup, On Stack Replacement (OSR) and Biased Locking somewhat better than before.  Special thanks to Dave Dice from Oracle, and Cliff Click & Gil Tene from Azul, for their very useful feedback.

In the last post I concluded, based on my experiments, that biased locking was no longer necessary on modern CPUs.  While this conclusion is understandable given the data gathered in the experiment, it was not valid because the experiment did not take account of some JVM warm up behaviour that I was unaware of.

In this post I will re-run the experiment taking into account the feedback and present some new results.  I shall also expand on the changes I've made to the test and why it is important to consider the JVM warm-up behaviour when writing micro-benchmarks, or even very lean Java applications with quick start up time.

On Stack Replacement (OSR)

Java virtual machines will compile code to achieve greater performance based on runtime profiling.  Some VMs run an interpreter for the majority of code and replace hot areas with compiled code following the 80/20 rule.  Other VMs compile all code simply at first then replace the simple code with more optimised code based on profiling.  Oracle Hotspot and Azul are examples of the first type and Oracle JRockit is an example of the second.

Oracle Hotspot will count invocations of a method return plus branch backs for loops in that method, and if this exceeds 10K in server mode the method will be compiled.  The compiled code on normal JIT'ing can be used when the method is next called.  However if a loop is still iterating it may make sense to replace the method before the loop completes, especially if it has many iterations to go.  OSR is the means by which a method gets replaced with a compiled version part way through iterating a loop.

I was under the impression that normal JIT'ing and OSR would result in similar code.  Cliff Click pointed out that it is much harder for a runtime to optimise a loop part way through, and especially difficult if nested.  For example, bounds checking within the loop may not be possible to eliminate. Cliff will blog in more detail on this shortly.

What this means is that you are likely to get better optimised code by doing a small number of shorter warm ups than a single large one.  You can see in the code below how I do 10 shorter runs in a loop before the main large run compared to the last article where I did a single large warm-up run.

Biased Locking

Dave Dice pointed out that Hotspot does not enable objects for biased locking in the first few seconds (4s at present) of JVM startup. This is because some benchmarks, and NetBeans, have a lot of thread contention on start up and the revocation cost is significant.

All objects by default are created with biased locking enabled in Oracle Hotspot after the first few seconds of start-up delay, and can be configured with -XX:BiasedLockingStartupDelay=0.

This point, combined with knowing more about OSR, is important for micro-benchmarks.  It is also important to be aware of these points if you have a lean Java application that starts in a few seconds.

The Code
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CyclicBarrier;

import static java.lang.System.out;

public final class TestLocks implements Runnable
{
    public enum LockType {JVM, JUC}
    public static LockType lockType;

    public static final long WARMUP_ITERATIONS = 100L * 1000L;
    public static final long ITERATIONS = 500L * 1000L * 1000L;
    public static long counter = 0L;

    public static final Object jvmLock = new Object();
    public static final Lock jucLock = new ReentrantLock();
    private static int numThreads;

    private final long iterationLimit;
    private final CyclicBarrier barrier;

    public TestLocks(final CyclicBarrier barrier, final long iterationLimit)
    {
        this.barrier = barrier;
        this.iterationLimit = iterationLimit;
    }

    public static void main(final String[] args) throws Exception
    {
        lockType = LockType.valueOf(args[0]);
        numThreads = Integer.parseInt(args[1]);

        for (int i = 0; i < 10; i++)
        {
            runTest(numThreads, WARMUP_ITERATIONS);
            counter = 0L;
        }

        final long start = System.nanoTime();
        runTest(numThreads, ITERATIONS);
        final long duration = System.nanoTime() - start;

        out.printf("%d threads, duration %,d (ns)\n", numThreads, duration);
        out.printf("%,d ns/op\n", duration / ITERATIONS);
        out.printf("%,d ops/s\n", (ITERATIONS * 1000000000L) / duration);
        out.println("counter = " + counter);
    }

    private static void runTest(final int numThreads, final long iterationLimit)
        throws Exception
    {
        CyclicBarrier barrier = new CyclicBarrier(numThreads);
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < threads.length; i++)
        {
            threads[i] = new Thread(new TestLocks(barrier, iterationLimit));
        }

        for (Thread t : threads)
        {
            t.start();
        }

        for (Thread t : threads)
        {
            t.join();
        }
    }

    public void run()
    {
        try
        {
            barrier.await();
        }
        catch (Exception e)
        {
            // don't care
        }

        switch (lockType)
        {
            case JVM: jvmLockInc(); break;
            case JUC: jucLockInc(); break;
        }
    }

    private void jvmLockInc()
    {
        long count = iterationLimit / numThreads;
        while (0 != count--)
        {
            synchronized (jvmLock)
            {
                ++counter;
            }
        }
    }

    private void jucLockInc()
    {
        long count = iterationLimit / numThreads;
        while (0 != count--)
        {
            jucLock.lock();
            try
            {
                ++counter;
            }
            finally
            {
                jucLock.unlock();
            }
        }
    }
}

Script to run tests:

set -x
for i in {1..8}
do 
    java -server -XX:-UseBiasedLocking TestLocks JVM $i
done

for i in {1..8}
do 
    java -server -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 TestLocks JVM $i
done

for i in {1..8}
do 
    java -server TestLocks JUC $i
done

Results

The tests are carried out with 64-bit Linux (Fedora Core 15) and Oracle JDK 1.6.0_29.

Nehalem 2.8GHz - Ops/Sec
Threads-UseBiasedLocking+UseBiasedLockingReentrantLock
153,283,461
450,950,969
62,876,566
218,519,295
18,108,615
10,217,186
313,349,605
13,416,198
14,108,622
48,120,172
8,040,773
14,207,310
54,725,114
4,551,766
14,302,683
65,133,706
5,246,548
14,676,616
75,473,652
5,585,666
18,145,525
85,514,056
5,414,171
19,010,725


Sandy Bridge 2.0GHz - Ops/Sec
Threads-UseBiasedLocking+UseBiasedLockingReentrantLock
1
34,500,407
396,511,324
43,148,808
2
20,899,076
19,742,639
6,038,923
3
9,288,039
11,957,032
24,147,807
4
5,618,862
5,589,289
9,082,961
5
5,609,932
5,592,574
9,389,243
6
5,742,907
5,760,558
12,518,728
7
6,699,201
6,641,886
13,684,475
8
6,957,824
6,925,410
14,819,005

Observations
  1. Biased locking has a huge benefit in the un-contended single threaded case.
  2. Biased locking when un-contended, and not revoked, only adds 4-5 cycles of cost.  This is the cost when having a cache hit for the lock structures, on top of the code protected in the critical section.
  3. -XX:BiasedLockingStartupDelay=0 needs to be set for lean applications and micro-benchmarks.
  4. Avoiding OSR does not make a material difference to this set of test results.  This is likely to be because the loop is so simple or other costs are dominating.
  5. For the current implementations, ReentrantLocks scale better than synchronised locks under contention, except in the case of 2 contending threads.
Conclusion

My tests in the last post are invalid for the testing of an un-contended biased lock, because the lock was not actually biased.  If you are designing code following the single writer principle, and therefore having un-contended locks when using 3rd party libraries, then having biased locking enabled is a significant performance boost.

Saturday, 5 November 2011

Locks & Condition Variables - Latency Impact

In a previous article on Inter-Thread Latency I showed how it is possible to signal a state change between 2 threads with less than 50ns of latency.  To many developers, writing concurrent code using locks is a scary experience.  Writing concurrent code using lock-free algorithms, i.e. algorithms that rely on the use of memory barriers and an intimate understanding of the underlying memory models, can be totally terrifying.  To me lock-free / non-blocking algorithms are like playing with explosives or corrosive chemicals, if you do not understand what you are doing, or show the ultimate respect, then very bad things can, and most likely will, happen!

In this article, I'd like to illustrate the impact of using locks and the resulting latency they can impose on your designs.  I want to use a very similar algorithm to that used in my previous inter-thread latency article to illustrate the ping-pong effect of handing control back and forth between 2 threads.  In this case, rather than using a couple of volatile variables, I will employ a pair of condition variables to signal a state change so control can be passed back and forth.

The Code
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.System.out;

public final class LockedSignallingLatency
{
    private static final int ITERATIONS = 10 * 1000 * 1000;

    private static final Lock lock = new ReentrantLock();
    private static final Condition sendCondition = lock.newCondition();
    private static final Condition echoCondition = lock.newCondition();

    private static long sendValue = -1L;
    private static long echoValue = -1L;

    public static void main(final String[] args)
        throws Exception
    {
        final Thread sendThread = new Thread(new SendRunner());
        final Thread echoThread = new Thread(new EchoRunner());

        final long start = System.nanoTime();

        echoThread.start();
        sendThread.start();

        sendThread.join();
        echoThread.join();

        final long duration = System.nanoTime() - start;

        out.printf("duration %,d (ns)\n", duration);
        out.printf("%,d ns/op\n", duration / (ITERATIONS * 2L));
        out.printf("%,d ops/s\n", (ITERATIONS * 2L * 1000000000L) / duration);
    }

    public static final class SendRunner implements Runnable
    {
        public void run()
        {
            for (long i = 0; i < ITERATIONS; i++)
            {
                lock.lock();
                try
                {
                    sendValue = i;
                    sendCondition.signal();
                }
                finally
                {
                    lock.unlock();
                }

                lock.lock();
                try
                {
                    while (echoValue != i)
                    {
                        echoCondition.await();
                    }
                }
                catch (final InterruptedException ex)
                {
                    break;
                }
                finally
                {
                    lock.unlock();
                }

            }
        }
    }

    public static final class EchoRunner implements Runnable
    {
        public void run()
        {
            for (long i = 0; i < ITERATIONS; i++)
            {
                lock.lock();
                try
                {
                    while (sendValue != i)
                    {
                        sendCondition.await();
                    }
                }
                catch (final InterruptedException ex)
                {
                    break;
                }
                finally
                {
                    lock.unlock();
                }

                lock.lock();
                try
                {
                    echoValue = i;
                    echoCondition.signal();
                }
                finally
                {
                    lock.unlock();
                }
            }
        }
    }
}
Test Results

Windows 7 Professional 64-bit - Oracle JDK 1.6.0 - Nehalem 2.8 GHz

$ start /AFFINITY 0x14 /B /WAIT java LockedSignallingLatency
duration 41,649,616,343 (ns)
2,082 ns/op
480,196 ops/s

$ java LockedSignallingLatency
duration 73,789,456,491 (ns)
3,689 ns/op
271,041 ops/s

Linux Fedora Core 15 64-bit - Oracle JDK 1.6.0 - Nehalem 2.8 GHz

$ taskset -c 2,4 java LockedSignallingLatency
duration 40,469,689,559 (ns)
2,023 ns/op
494,197 ops/s

$ java LockedSignallingLatency
duration 169,795,756,230 (ns)
8,489 ns/op
117,788 ops/s

Linux Fedora Core 15 64-bit - Oracle JDK 1.6.0 - Sandybridge 2.0 GHz

$ taskset -c 2,4 java LockedSignallingLatency
duration 47,209,549,484 (ns)
2,360 ns/op
423,643 ops/s

$ java LockedSignallingLatency
duration 336,168,489,093 (ns)
16,808 ns/op
59,493 ops/s

Observations

The above is a typical set of results I've seen in the middle of the range from multiple runs.  There are a couple of interesting observations I'd like to expand on.

Firstly, this is 3 orders of magnitude greater latency than what I illustrated in the previous article using just memory barriers to signal between threads.  This cost comes about because the kernel needs to get involved to arbitrate between the threads for the lock, and then manage the scheduling for the threads to awaken when the condition is signalled.  The one-way latency to signal a change is pretty much the same as what is considered current state of the art for network hops between nodes via a switch.  It is possible to get ~1µs latency with InfiniBand and less than 5µs with 10GigE and user-space IP stacks.

Secondly, the impact is clear when letting the OS choose what CPUs the threads get scheduled on rather than pinning them manually.  I've observed this same issue across many use cases whereby Linux, in default configuration for its scheduler, will greatly impact the performance of a low-latency system by scheduling threads on different cores resulting in cache pollution.   Windows by default seems to make a better job of this.

I recently had an interesting discussion with Cliff Click about using condition variables and their cost.  He pointed out a problem he was seeing.  If you look at the case where a sleeping thread gets signalled within the lock, it goes to run and then discovers it cannot get the lock because the signalling thread already has the lock, so it gets put back to sleep until the signalling thread releases the lock, thus causing more work than necessary.  Modern schedulers would benefit from being more aware of communication mechanisms between threads to have more efficient location and rescheduling logic.  As we go more concurrent and parallel our schedulers need to become more aware of IPC mechanisms.

Conclusion

When designing a low-latency system it is crucial to avoid the use of locks and condition variables for the main transaction flows.  Non-blocking or lock-free algorithms are key to achieving ultra-low latency but can be very difficult to prove correct.   I would not recommend designing lock-free algorithms for business logic but they can be very effectively employed for low-level infrastructure components.  The business logic is best run on single threads following the Single Writer Principle from my previous article.