1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
#ifndef lxZLmAQ0qUwCFC9Ehehzy92Ldko
#define lxZLmAQ0qUwCFC9Ehehzy92Ldko
/* warn: heap is sloppy with the write barrier of it's content
mmap()ed data store; the stated barrier is always equal or less
than the actually used amount of space; it is only correct after
sync() calls (which happen implicitely at interval turnover */
#include <inttypes.h>
#include <boost/tuple/tuple.hpp>
#include "checkpoint.hpp"
#include "everything_else.hpp"
#include "scalar.hpp"
#include "string_helpers.hpp"
#include "time.hpp"
#include "vector.hpp"
template<typename Payload>
struct Heap {
// constructor requires a 3-tuple:
// 1. name (used for file name creation)
// 2. average size of contained object
// 3. default payload value (if a new heap is created)
Heap(const string name, const uint64_t avgLength);
Heap();
// read & write access
inline Payload & operator() (Time t) __attribute__ ((pure));
void sync();
// intern methods
void addCheckpoints(Time t);
// intern data
typedef Vector<char>::ptr_t ptr_t;
ptr_t interval; // current interval (number)
Time timeLimit; // current intervals time limit (inclusive)
Time currentTime; // current intervals time (same as times(interval)
// but allows compiler to perform better
// optimizations)
// file backings
Vector<Time> times; // times of the payload
Vector<char> content; // the payloads (in temporally ascending order)
Vector<ptr_t> index;
private:
inline Payload & get(ptr_t id) __attribute__ ((pure));
void init();
};
template<typename Payload>
Heap<Payload>::Heap(const string name, const uint64_t avgLength) :
times( "heap_times_" + name, maxCheckpoints),
content("heap_content_" + name, maxCheckpoints * avgLength),
index( "heap_index_" + name, maxCheckpoints)
{
init();
}
template<typename Payload>
Heap<Payload>::Heap() :
times( "heap_times_" + std::string(Payload::payload_t::quant_t::name),
maxCheckpoints),
content("heap_content_" + std::string(Payload::payload_t::quant_t::name),
maxCheckpoints * AverageQueueSize<typename Payload::payload_t::quant_t>::value * Payload::elementSize),
index( "heap_index_" + std::string(Payload::payload_t::quant_t::name),
maxCheckpoints)
{
init();
}
template<typename Payload>
void Heap<Payload>::init() {
// do we create the heap (and backing files)?
if (index.barrierWrite() == 0) {
// yes -> extra initialisation
interval = 0;
timeLimit = 0;
currentTime = 0;
times(0) = 0;
times.barrierWrite() = 1;
// new(&(content(0))) Payload(boost::get<2>(arg)); // init with given arg
new(&(content(0))) Payload(); // init with empty ctor
content.barrierWrite() = get(0).getSize();
index(0) = 0;
index.barrierWrite() = 1;
}else{
// no -> restore previous setup
interval = index.barrierWrite() - 1;
timeLimit = Time(checkpointInterval) * interval; // hint interval #0 ends with time 0
currentTime = times(interval);
}
}
template<typename Payload>
inline Payload& Heap<Payload>::operator() (Time t) {
if (t > currentTime) {
if (unlikely(t > timeLimit)) addCheckpoints(t);
currentTime = t;
times(interval) = t;
}
return get(interval);
}
template<typename Payload>
void Heap<Payload>::sync() {
// update length of the current heap
// WARN: for synchronization these values actually should never
// shrink, but the embedded container might violate this
content.barrierWrite() = index(interval) + get(interval).getSize();
// sync all mmap containers
content.sync();
times.sync();
index.sync();
}
// see checkpoint.hpp for a more commented analogon of this method
template<typename Payload>
void Heap<Payload>::addCheckpoints(Time t) {
assert(index.barrierWrite() == times.barrierWrite()); // is a pre- and postcondition
while (t > timeLimit) {
content.barrierWrite() = index(interval) + get(interval).getSize(); // update current payload size
interval++;
index(interval) = content.barrierWrite();
times(interval) = times(interval - 1);
// invoke copy constructor via placement new to copy the payload
new(&get(interval)) Payload(get(interval-1));
index.barrierWrite()++;
times.barrierWrite()++;
times.barrierRead() = times.barrierWrite() - 1;
index.barrierRead() = index.barrierWrite() - 1;
content.barrierRead() = content.barrierWrite();
timeLimit = checkpointInterval * interval;
}
currentTime = t;
times(interval) = t;
assert(index.barrierWrite() == times.barrierWrite());
content.advise(0, content.barrierRead(), MADV_DONTNEED);
}
template<typename Payload>
inline Payload& Heap<Payload>::get (ptr_t id) {
return *((Payload*) (&(content(index(id)))));
}
#endif // lxZLmAQ0qUwCFC9Ehehzy92Ldko
|