summaryrefslogtreecommitdiff
path: root/core/heap.hpp
blob: 40c0a555bb71f7eac872f296bfa786ec424815b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#ifndef lxZLmAQ0qUwCFC9Ehehzy92Ldko
#define lxZLmAQ0qUwCFC9Ehehzy92Ldko

/* warn: heap is sloppy with the write barrier of it's content
   mmap()ed data store; the stated barrier is always equal or less
   than the actually used amount of space; it is only correct after
   sync() calls (which happen implicitely at interval turnover */

#include <inttypes.h>
#include <boost/tuple/tuple.hpp>

#include "checkpoint.hpp"
#include "everything_else.hpp"
#include "scalar.hpp"
#include "string_helpers.hpp"
#include "time.hpp"
#include "vector.hpp"

template<typename Payload>
struct Heap {
  // constructor requires a 3-tuple:
  //   1. name (used for file name creation)
  //   2. average size of contained object
  //   3. default payload value (if a new heap is created)
  Heap(const string name, const uint64_t avgLength);
  Heap();

  // read & write access
  inline Payload & operator() (Time t) __attribute__ ((pure));
  void sync();

  // intern methods
  void addCheckpoints(Time t);

  // intern data
  typedef Vector<char>::ptr_t ptr_t;
  ptr_t interval; // current interval (number)
  Time timeLimit; // current intervals time limit (inclusive)
  Time currentTime; // current intervals time (same as times(interval)
		    // but allows compiler to perform better
		    // optimizations)


  // file backings
  Vector<Time> times; // times of the payload
  Vector<char> content; // the payloads (in temporally ascending order)
  Vector<ptr_t> index;

private:
  inline Payload & get(ptr_t id) __attribute__ ((pure));
  void init();
};

template<typename Payload>
Heap<Payload>::Heap(const string name, const uint64_t avgLength) :
  times(  "heap_times_"   + name, maxCheckpoints),
  content("heap_content_" + name, maxCheckpoints * avgLength),
  index(  "heap_index_"   + name, maxCheckpoints)
{
  init();
}

template<typename Payload>
Heap<Payload>::Heap() :
  times(  "heap_times_"   + std::string(Payload::payload_t::quant_t::name), 
	  maxCheckpoints),
  content("heap_content_" + std::string(Payload::payload_t::quant_t::name), 
	  maxCheckpoints  * AverageQueueSize<typename Payload::payload_t::quant_t>::value * Payload::elementSize),
  index(  "heap_index_"   + std::string(Payload::payload_t::quant_t::name), 
	  maxCheckpoints)
{
  init();
}

template<typename Payload>
void Heap<Payload>::init() {
  // do we create the heap (and backing files)?
  if (index.barrierWrite() == 0) {
    // yes -> extra initialisation
    interval = 0;    
    timeLimit = 0;
    currentTime = 0;
    times(0) = 0;
    times.barrierWrite() = 1;
    //     new(&(content(0))) Payload(boost::get<2>(arg)); // init with given arg
    new(&(content(0))) Payload(); // init with empty ctor
    content.barrierWrite() = get(0).getSize();
   index(0) = 0;
    index.barrierWrite() = 1;
  }else{
    // no -> restore previous setup
    interval = index.barrierWrite() - 1;
    timeLimit = Time(checkpointInterval) * interval; // hint interval #0 ends with time 0
    currentTime = times(interval);
  }
}


template<typename Payload>
inline Payload& Heap<Payload>::operator() (Time t) {
  if (t > currentTime) {
    if (unlikely(t > timeLimit)) addCheckpoints(t);
    currentTime = t;
    times(interval) = t;
  }
  return get(interval);
}

template<typename Payload>
void Heap<Payload>::sync() {
  // update length of the current heap
  // WARN: for synchronization these values actually should never
  // shrink, but the embedded container might violate this
  content.barrierWrite() = index(interval) + get(interval).getSize();

  // sync all mmap containers
  content.sync();
  times.sync();
  index.sync();
}

// see checkpoint.hpp for a more commented analogon of this method
template<typename Payload>
void Heap<Payload>::addCheckpoints(Time t) {
  assert(index.barrierWrite() == times.barrierWrite()); // is a pre- and postcondition

  while (t > timeLimit) {
    content.barrierWrite() = index(interval) + get(interval).getSize(); // update current payload size
    interval++;
    index(interval) = content.barrierWrite();
    times(interval) = times(interval - 1);

    // invoke copy constructor via placement new to copy the payload
    new(&get(interval)) Payload(get(interval-1));

    index.barrierWrite()++;
    times.barrierWrite()++;
    times.barrierRead() = times.barrierWrite() - 1;
    index.barrierRead() = index.barrierWrite() - 1;
    content.barrierRead() = content.barrierWrite();
    
    timeLimit = checkpointInterval * interval;
  }

  currentTime = t;
  times(interval) = t;

  assert(index.barrierWrite() == times.barrierWrite());

  content.advise(0, content.barrierRead(), MADV_DONTNEED);
}

template<typename Payload>
inline Payload& Heap<Payload>::get (ptr_t id) {
  return *((Payload*) (&(content(index(id)))));
}
#endif // lxZLmAQ0qUwCFC9Ehehzy92Ldko
contact: Jan Huwald // Impressum