Program Listing for File PDJE_Buffer.hpp
↰ Return to documentation for file (include/global/DataLines/PDJE_Buffer.hpp)
#pragma once
#include "PDJE_EXPORT_SETTER.hpp"
#include "ipc_shared_memory.hpp"
#include <atomic>
#include <memory_resource>
#include <random>
#include <vector>
namespace fs = std::filesystem;
template <typename T> class PDJE_Buffer_Arena {
private:
PDJE_IPC::SharedMem<T, PDJE_IPC::PDJE_IPC_RW> buf1;
PDJE_IPC::SharedMem<T, PDJE_IPC::PDJE_IPC_RW> buf2;
PDJE_IPC::SharedMem<uint64_t, PDJE_IPC::PDJE_IPC_RW> first_count;
PDJE_IPC::SharedMem<uint64_t, PDJE_IPC::PDJE_IPC_RW> second_count;
PDJE_IPC::SharedMem<std::atomic_flag, PDJE_IPC::PDJE_IPC_RW> lock;
PDJE_IPC::SharedMem<uint8_t, PDJE_IPC::PDJE_IPC_RW> buf_first;
T *buffer_first_pointer_cache = nullptr;
T *buffer_second_pointer_cache = nullptr;
public:
uint64_t BUFFER_COUNT = 0;
std::filesystem::path ID;
void
Write(const T &data);
std::pair<T *, uint64_t>
Get();
PDJE_Buffer_Arena(const std::string& id, const uint64_t count)
{
BUFFER_COUNT = count;
buf1.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_FIRST") + id), BUFFER_COUNT);
buf2.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_SECOND") + id), BUFFER_COUNT);
first_count.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_COUNT_FIRST") + id), 1);
second_count.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_COUNT_SECOND") + id), 1);
lock.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_LOCK") + id), 1);
buf_first.GetIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUFFER_SWITCH") + id), 1);
buffer_first_pointer_cache = buf1.ptr;
buffer_second_pointer_cache = buf2.ptr;
}
PDJE_Buffer_Arena(const uint64_t count)
{
BUFFER_COUNT = count;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dis(0, INT_MAX);
ID = fs::path(std::to_string(dis(gen)));
buf1.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_FIRST") +
ID.string()),BUFFER_COUNT);
buf2.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_SECOND") +
ID.string()), BUFFER_COUNT);
first_count.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_COUNT_FIRST") +
ID.string()), 1);
second_count.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUF_COUNT_SECOND") +
ID.string()), 1);
lock.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_LOCK") +
ID.string()), 1);
buf_first.MakeIPCSharedMemory(
fs::path(std::string("PDJE_INPUT_SHMEM_BUFFER_SWITCH") +
ID.string()), 1);
buffer_first_pointer_cache = buf1.ptr;
buffer_second_pointer_cache = buf2.ptr;
new (lock.ptr) std::atomic_flag();
*first_count.ptr = 0;
*second_count.ptr = 0;
*buf_first.ptr = 1;
lock.ptr->clear(std::memory_order_relaxed);
}
~PDJE_Buffer_Arena() = default;
};
template <typename T>
void
PDJE_Buffer_Arena<T>::Write(const T &data)
{
while (lock.ptr->test_and_set(std::memory_order_acquire)) {
}
// locked
if (*buf_first.ptr == 1) {
if ((*first_count.ptr) >= BUFFER_COUNT) {
lock.ptr->clear(std::memory_order_release); // unlock
return;
}
buf1.ptr[*(first_count.ptr)] = data;
++(*first_count.ptr);
} else {
if ((*second_count.ptr) >= BUFFER_COUNT) {
lock.ptr->clear(std::memory_order_release); // unlock
return;
}
buf2.ptr[(*second_count.ptr)] = data;
++(*second_count.ptr);
}
lock.ptr->clear(std::memory_order_release); // unlock
}
template <typename T>
std::pair<T *, uint64_t>
PDJE_Buffer_Arena<T>::Get()
{
while (lock.ptr->test_and_set(std::memory_order_acquire)) {
}
if (*buf_first.ptr == 1) {
(*second_count.ptr) = 0;
} else {
(*first_count.ptr) = 0;
}
(*buf_first.ptr) = (*buf_first.ptr == 1) ? 0 : 1;
lock.ptr->clear(std::memory_order_release);
if ((*buf_first.ptr) == 1) {
return std::pair(buffer_second_pointer_cache, (*second_count.ptr));
} else {
return std::pair(buffer_first_pointer_cache, (*first_count.ptr));
}
}