diff options
author | Jan Huwald <jh@sotun.de> | 2012-05-07 20:01:51 (GMT) |
---|---|---|
committer | Jan Huwald <jh@sotun.de> | 2012-05-07 20:01:51 (GMT) |
commit | 420d2ef464d4a741028e132e662d5626806a41f5 (patch) | |
tree | 1aca6eb512e4ed0fb5f3c10c528cb998b6ffd695 /core/heap.hpp |
Diffstat (limited to 'core/heap.hpp')
-rw-r--r-- | core/heap.hpp | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/core/heap.hpp b/core/heap.hpp new file mode 100644 index 0000000..40c0a55 --- /dev/null +++ b/core/heap.hpp @@ -0,0 +1,157 @@ +#ifndef lxZLmAQ0qUwCFC9Ehehzy92Ldko +#define lxZLmAQ0qUwCFC9Ehehzy92Ldko + +/* warn: heap is sloppy with the write barrier of it's content + mmap()ed data store; the stated barrier is always equal or less + than the actually used amount of space; it is only correct after + sync() calls (which happen implicitely at interval turnover */ + +#include <inttypes.h> +#include <boost/tuple/tuple.hpp> + +#include "checkpoint.hpp" +#include "everything_else.hpp" +#include "scalar.hpp" +#include "string_helpers.hpp" +#include "time.hpp" +#include "vector.hpp" + +template<typename Payload> +struct Heap { + // constructor requires a 3-tuple: + // 1. name (used for file name creation) + // 2. average size of contained object + // 3. default payload value (if a new heap is created) + Heap(const string name, const uint64_t avgLength); + Heap(); + + // read & write access + inline Payload & operator() (Time t) __attribute__ ((pure)); + void sync(); + + // intern methods + void addCheckpoints(Time t); + + // intern data + typedef Vector<char>::ptr_t ptr_t; + ptr_t interval; // current interval (number) + Time timeLimit; // current intervals time limit (inclusive) + Time currentTime; // current intervals time (same as times(interval) + // but allows compiler to perform better + // optimizations) + + + // file backings + Vector<Time> times; // times of the payload + Vector<char> content; // the payloads (in temporally ascending order) + Vector<ptr_t> index; + +private: + inline Payload & get(ptr_t id) __attribute__ ((pure)); + void init(); +}; + +template<typename Payload> +Heap<Payload>::Heap(const string name, const uint64_t avgLength) : + times( "heap_times_" + name, maxCheckpoints), + content("heap_content_" + name, maxCheckpoints * avgLength), + index( "heap_index_" + name, maxCheckpoints) +{ + init(); +} + +template<typename Payload> +Heap<Payload>::Heap() : + times( "heap_times_" + std::string(Payload::payload_t::quant_t::name), + maxCheckpoints), + content("heap_content_" + std::string(Payload::payload_t::quant_t::name), + maxCheckpoints * AverageQueueSize<typename Payload::payload_t::quant_t>::value * Payload::elementSize), + index( "heap_index_" + std::string(Payload::payload_t::quant_t::name), + maxCheckpoints) +{ + init(); +} + +template<typename Payload> +void Heap<Payload>::init() { + // do we create the heap (and backing files)? + if (index.barrierWrite() == 0) { + // yes -> extra initialisation + interval = 0; + timeLimit = 0; + currentTime = 0; + times(0) = 0; + times.barrierWrite() = 1; + // new(&(content(0))) Payload(boost::get<2>(arg)); // init with given arg + new(&(content(0))) Payload(); // init with empty ctor + content.barrierWrite() = get(0).getSize(); + index(0) = 0; + index.barrierWrite() = 1; + }else{ + // no -> restore previous setup + interval = index.barrierWrite() - 1; + timeLimit = Time(checkpointInterval) * interval; // hint interval #0 ends with time 0 + currentTime = times(interval); + } +} + + +template<typename Payload> +inline Payload& Heap<Payload>::operator() (Time t) { + if (t > currentTime) { + if (unlikely(t > timeLimit)) addCheckpoints(t); + currentTime = t; + times(interval) = t; + } + return get(interval); +} + +template<typename Payload> +void Heap<Payload>::sync() { + // update length of the current heap + // WARN: for synchronization these values actually should never + // shrink, but the embedded container might violate this + content.barrierWrite() = index(interval) + get(interval).getSize(); + + // sync all mmap containers + content.sync(); + times.sync(); + index.sync(); +} + +// see checkpoint.hpp for a more commented analogon of this method +template<typename Payload> +void Heap<Payload>::addCheckpoints(Time t) { + assert(index.barrierWrite() == times.barrierWrite()); // is a pre- and postcondition + + while (t > timeLimit) { + content.barrierWrite() = index(interval) + get(interval).getSize(); // update current payload size + interval++; + index(interval) = content.barrierWrite(); + times(interval) = times(interval - 1); + + // invoke copy constructor via placement new to copy the payload + new(&get(interval)) Payload(get(interval-1)); + + index.barrierWrite()++; + times.barrierWrite()++; + times.barrierRead() = times.barrierWrite() - 1; + index.barrierRead() = index.barrierWrite() - 1; + content.barrierRead() = content.barrierWrite(); + + timeLimit = checkpointInterval * interval; + } + + currentTime = t; + times(interval) = t; + + assert(index.barrierWrite() == times.barrierWrite()); + + content.advise(0, content.barrierRead(), MADV_DONTNEED); +} + +template<typename Payload> +inline Payload& Heap<Payload>::get (ptr_t id) { + return *((Payload*) (&(content(index(id))))); +} +#endif // lxZLmAQ0qUwCFC9Ehehzy92Ldko |