티스토리 뷰

카테고리 없음

Magic ringbuffer

newpolaris 2020. 2. 15. 17:20

Filament의 소스의 command buffer는 circularBuffer를 이용하여 구현되어있다.

그 중 CircularBuffer (ring buffer) 구현이 독특해서 알아봤다

magic ringbuffer 라는 트릭은,

mmap으로 공간을 2배로 할당받은 뒤, 그 할당받은 영역의 주소를 반으로 나눠

다시 mmap으로 매핑하는데, 이때 같은 대상을 가르키게 한다 (여기선 fd)

그럼 첫번째 버퍼를 size 보다 크게 기록하는 오버플로우 시,

붙어있던 2번째 버퍼의 영역에 기록을 하게 되는데

메모리 유닛이 이때, 같은 대상을 가르키므로,

자동으로 원래 버퍼의 처음 영역에 기록하여준다

아래 링크의 이미지를 보는게 이해하기 쉽다

https://github.com/willemt/cbuffer/blob/master/README.rst

https://github.com/smcho-kr/magic-ring-buffer/blob/master/README.md

여러 설명과 환경별 구현은

https://www.slideshare.net/urakarin/magic-ring-buffer

#include <iostream>
#include <cassert>
#include <sys/mman.h>
#include <unistd.h>

void* cbuffer_new(const size_t size)
{
    char path[] = "/tmp/cb-XXXXXX";
    int fd, status;
    void* address;

    fd = mkstemp(path);
    if (fd < 0)
        abort();

    status = unlink(path);
    if (status)
        abort();
    status = ftruncate(fd, size);
    if (status)
        abort();

    void* data = (char*)mmap(NULL, size*2, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if (data == MAP_FAILED)
        abort();
    address = (char*)mmap(data, size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    if (address != data)
        abort();
    address = (char*)mmap((char*)data + size, size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);

    if (address == MAP_FAILED)
        abort();

    if (address != (char*)data + size)
        abort();

    status = close(fd);
    return data;
}

int main(int argc, const char * argv[]) {
    auto dst = (uint8_t*)cbuffer_new(4096);
    for (int i = 0; i < 20; i++)
        dst[i + 4090] = i;
    for (int i = 4090; i < 4100; i++)
        printf("%d %d\n", i, dst[i]);
    for (int i = 0; i < 30; i++)
        printf("%d %d\n", i, dst[i]);
    return 0;
}

아래와 같이 4096을 초과한 값이 0 부터 계속 기록된다

물론 한계는 4092 x 2 배만 설정해뒀기에 계속 증가할 순 없다

그렇기에 단순, 선형메모리가 요구되는 곳에서 일부 코드를 몇줄 줄이는 정도로 밖에 못쓰는 듯

4090 0
4091 1
4092 2
4093 3
4094 4
4095 5
4096 6
4097 7
4098 8
4099 9
0 6
1 7
2 8
3 9
4 10
5 11
6 12
7 13
8 14
9 15

https://lo.calho.st/posts/black-magic-buffer/

극단적인 예제

    double slow_start = microtime();
    offset = 0;
    for(int i = 0; i < NUMBER_RUNS; i++){
        const size_t part1 = min(MESSAGE_SIZE, BUFFER_SIZE - offset);
        const size_t part2 = MESSAGE_SIZE - part1;
        memcpy(buffer + offset, message, part1);
        memcpy(buffer, message + part1, part2);
        offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
    }
    double slow_stop = microtime();

    double fast_start = microtime();
    offset = 0;
    for(int i = 0; i < NUMBER_RUNS; i++){
        memcpy(&buffer[offset], message, MESSAGE_SIZE);
        offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
    }
    double fast_stop = microtime();

그런데, Filament 에서는 별도의 flag 설정을 몇개 더 한 후

data 의 앞부분을 memset으로 초기화 한다;

별도의 flag 설정이 없을 경우, 이전 버퍼에 기록이 되는데

#include <iostream>
#include <cassert>
#include <sys/mman.h>
#include <unistd.h>

void* cbuffer_new(const size_t size)
{
    char path[] = "/tmp/cb-XXXXXX";
    int fd, status;
    void* address;

    fd = mkstemp(path);
    if (fd < 0)
        abort();

    status = unlink(path);
    if (status)
        abort();
    status = ftruncate(fd, size);
    if (status)
        abort();

    void* data = (char*)mmap(NULL, size*2, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if (data == MAP_FAILED)
        abort();
    munmap(data, size*2);
    address = (char*)mmap(data, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
    if (address != data)
        abort();
    address = (char*)mmap((char*)data + size, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);

    if (address == MAP_FAILED)
        abort();

    if (address != (char*)data + size)
        abort();

    status = close(fd);
    return data;
}

int main(int argc, const char * argv[]) {
    assert(4096 % getpagesize() == 0);

    auto dst = (uint8_t*)cbuffer_new(4096);
    for (int i = 0; i < 50; i++)
        dst[i + 4090] = i;
    for (int i = 0; i < 10; i++)
        dst[i] = 117;
    for (int i = 4090; i < 4100; i++)
        printf("%d %d\n", i, dst[i]);
    for (int i = 0; i < 30; i++)
        printf("%d %d\n", i, dst[i]);
    return 0;
}

filagment의 flag 설정 대로 라면, private로 인해 overwrite 해도 0부터 반영되지 않는다

아래는 filament의 circularize() 함수

        intptr_t overflow = intptr_t(mHead) - (intptr_t(mData) + ssize_t(mSize));
        if (overflow >= 0) {
            assert(size_t(overflow) <= mSize);
            mHead = (void *) (intptr_t(mData) + overflow);
            #ifndef NDEBUG
            memset(mData, 0xA5, size_t(overflow));
            #endif
        }

그래서 memset을 해도되는 듯,

그런데 그러면 mHead는 왜 저런 포지션을 가지는 거고;

애초에 2배 매모리쓰는 거 보다 복잡합하기만 한거 아닌가

#include <iostream>
#include <cassert>
#include <sys/mman.h>
#include <unistd.h>

void* cbuffer_new(const size_t size)
{
    char path[] = "/tmp/cb-XXXXXX";
    int fd, status;
    void* address;

    fd = mkstemp(path);
    if (fd < 0)
        abort();

    status = unlink(path);
    if (status)
        abort();
    status = ftruncate(fd, size);
    if (status)
        abort();

    void* data = (char*)mmap(NULL, size*2, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if (data == MAP_FAILED)
        abort();
    munmap(data, size*2);
    address = (char*)mmap(data, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
    if (address != data)
        abort();
    address = (char*)mmap((char*)data + size, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);

    if (address == MAP_FAILED)
        abort();

    if (address != (char*)data + size)
        abort();

    status = close(fd);
    return data;
}
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>

#define BUFFER_SIZE (4096*9)
#define MESSAGE_SIZE (32)
#define NUMBER_RUNS (1000000)

static inline double microtime() {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return 1e6 * tv.tv_sec + tv.tv_usec;
}

static inline off_t min(off_t a, off_t b) {
    return a < b ? a : b;
}

int main() {
    uint8_t message[MESSAGE_SIZE];
    char* buffer = (char*)malloc(2*BUFFER_SIZE);

    char* buffer2 = (char*)cbuffer_new(BUFFER_SIZE);

    size_t offset;

    double slow_start = microtime();
    offset = 0;
    for(int i = 0; i < NUMBER_RUNS; i++){
        const size_t part1 = min(MESSAGE_SIZE, BUFFER_SIZE - offset);
        const size_t part2 = MESSAGE_SIZE - part1;
        memcpy(buffer + offset, message, part1);
        memcpy(buffer, message + part1, part2);
        asm volatile("" : : "r,m"(buffer) : "memory");

        offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
    }
    double slow_stop = microtime();

    double fast_start = microtime();
    offset = 0;
    for(int i = 0; i < NUMBER_RUNS; i++){
        memcpy(&buffer2[offset], message, MESSAGE_SIZE);
        asm volatile("" : : "r,m"(buffer) : "memory");

        offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
    }
    double fast_stop = microtime();

    printf("slow: %f microseconds per write\n", (slow_stop - slow_start) / NUMBER_RUNS);
    printf("fast: %f microseconds per write\n", (fast_stop - fast_start) / NUMBER_RUNS);

    return 0;
}

대강 결과는 100만번 반복시,

4096

slow: 0.007676 microseconds per write

fast: 0.001575 microseconds per write

4096*9

slow: 0.007164 microseconds per write

fast: 0.003944 microseconds per write

먼가 옵티마이즈 된거 같은데; 이부분자체가 오래 걸리는 부분도 아니고;

memcpy를 2번 안하고 링버퍼 만으로 쓰기위해 그냥 넘치는 부분을 둔다고 하면

거의 똑같이 나온다

slow: 0.001626 microseconds per write

fast: 0.001665 microseconds per write

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크