summaryrefslogtreecommitdiff
path: root/core/heap.hpp
diff options
context:
space:
mode:
authorJan Huwald <jh@sotun.de>2012-05-07 20:01:51 (GMT)
committerJan Huwald <jh@sotun.de>2012-05-07 20:01:51 (GMT)
commit420d2ef464d4a741028e132e662d5626806a41f5 (patch)
tree1aca6eb512e4ed0fb5f3c10c528cb998b6ffd695 /core/heap.hpp
Initial commitHEADmaster
Diffstat (limited to 'core/heap.hpp')
-rw-r--r--core/heap.hpp157
1 files changed, 157 insertions, 0 deletions
diff --git a/core/heap.hpp b/core/heap.hpp
new file mode 100644
index 0000000..40c0a55
--- /dev/null
+++ b/core/heap.hpp
@@ -0,0 +1,157 @@
+#ifndef lxZLmAQ0qUwCFC9Ehehzy92Ldko
+#define lxZLmAQ0qUwCFC9Ehehzy92Ldko
+
+/* warn: heap is sloppy with the write barrier of it's content
+ mmap()ed data store; the stated barrier is always equal or less
+ than the actually used amount of space; it is only correct after
+ sync() calls (which happen implicitely at interval turnover */
+
+#include <inttypes.h>
+#include <boost/tuple/tuple.hpp>
+
+#include "checkpoint.hpp"
+#include "everything_else.hpp"
+#include "scalar.hpp"
+#include "string_helpers.hpp"
+#include "time.hpp"
+#include "vector.hpp"
+
+template<typename Payload>
+struct Heap {
+ // constructor requires a 3-tuple:
+ // 1. name (used for file name creation)
+ // 2. average size of contained object
+ // 3. default payload value (if a new heap is created)
+ Heap(const string name, const uint64_t avgLength);
+ Heap();
+
+ // read & write access
+ inline Payload & operator() (Time t) __attribute__ ((pure));
+ void sync();
+
+ // intern methods
+ void addCheckpoints(Time t);
+
+ // intern data
+ typedef Vector<char>::ptr_t ptr_t;
+ ptr_t interval; // current interval (number)
+ Time timeLimit; // current intervals time limit (inclusive)
+ Time currentTime; // current intervals time (same as times(interval)
+ // but allows compiler to perform better
+ // optimizations)
+
+
+ // file backings
+ Vector<Time> times; // times of the payload
+ Vector<char> content; // the payloads (in temporally ascending order)
+ Vector<ptr_t> index;
+
+private:
+ inline Payload & get(ptr_t id) __attribute__ ((pure));
+ void init();
+};
+
+template<typename Payload>
+Heap<Payload>::Heap(const string name, const uint64_t avgLength) :
+ times( "heap_times_" + name, maxCheckpoints),
+ content("heap_content_" + name, maxCheckpoints * avgLength),
+ index( "heap_index_" + name, maxCheckpoints)
+{
+ init();
+}
+
+template<typename Payload>
+Heap<Payload>::Heap() :
+ times( "heap_times_" + std::string(Payload::payload_t::quant_t::name),
+ maxCheckpoints),
+ content("heap_content_" + std::string(Payload::payload_t::quant_t::name),
+ maxCheckpoints * AverageQueueSize<typename Payload::payload_t::quant_t>::value * Payload::elementSize),
+ index( "heap_index_" + std::string(Payload::payload_t::quant_t::name),
+ maxCheckpoints)
+{
+ init();
+}
+
+template<typename Payload>
+void Heap<Payload>::init() {
+ // do we create the heap (and backing files)?
+ if (index.barrierWrite() == 0) {
+ // yes -> extra initialisation
+ interval = 0;
+ timeLimit = 0;
+ currentTime = 0;
+ times(0) = 0;
+ times.barrierWrite() = 1;
+ // new(&(content(0))) Payload(boost::get<2>(arg)); // init with given arg
+ new(&(content(0))) Payload(); // init with empty ctor
+ content.barrierWrite() = get(0).getSize();
+ index(0) = 0;
+ index.barrierWrite() = 1;
+ }else{
+ // no -> restore previous setup
+ interval = index.barrierWrite() - 1;
+ timeLimit = Time(checkpointInterval) * interval; // hint interval #0 ends with time 0
+ currentTime = times(interval);
+ }
+}
+
+
+template<typename Payload>
+inline Payload& Heap<Payload>::operator() (Time t) {
+ if (t > currentTime) {
+ if (unlikely(t > timeLimit)) addCheckpoints(t);
+ currentTime = t;
+ times(interval) = t;
+ }
+ return get(interval);
+}
+
+template<typename Payload>
+void Heap<Payload>::sync() {
+ // update length of the current heap
+ // WARN: for synchronization these values actually should never
+ // shrink, but the embedded container might violate this
+ content.barrierWrite() = index(interval) + get(interval).getSize();
+
+ // sync all mmap containers
+ content.sync();
+ times.sync();
+ index.sync();
+}
+
+// see checkpoint.hpp for a more commented analogon of this method
+template<typename Payload>
+void Heap<Payload>::addCheckpoints(Time t) {
+ assert(index.barrierWrite() == times.barrierWrite()); // is a pre- and postcondition
+
+ while (t > timeLimit) {
+ content.barrierWrite() = index(interval) + get(interval).getSize(); // update current payload size
+ interval++;
+ index(interval) = content.barrierWrite();
+ times(interval) = times(interval - 1);
+
+ // invoke copy constructor via placement new to copy the payload
+ new(&get(interval)) Payload(get(interval-1));
+
+ index.barrierWrite()++;
+ times.barrierWrite()++;
+ times.barrierRead() = times.barrierWrite() - 1;
+ index.barrierRead() = index.barrierWrite() - 1;
+ content.barrierRead() = content.barrierWrite();
+
+ timeLimit = checkpointInterval * interval;
+ }
+
+ currentTime = t;
+ times(interval) = t;
+
+ assert(index.barrierWrite() == times.barrierWrite());
+
+ content.advise(0, content.barrierRead(), MADV_DONTNEED);
+}
+
+template<typename Payload>
+inline Payload& Heap<Payload>::get (ptr_t id) {
+ return *((Payload*) (&(content(index(id)))));
+}
+#endif // lxZLmAQ0qUwCFC9Ehehzy92Ldko
contact: Jan Huwald // Impressum