Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions benchmark/http/bench-parser-fragmented.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';

const common = require('../common');

const bench = common.createBenchmark(main, {
len: [8, 16],
frags: [2, 4, 8],
n: [1e5],
}, {
flags: ['--expose-internals', '--no-warnings'],
});

function main({ len, frags, n }) {
const { HTTPParser } = common.binding('http_parser');
const REQUEST = HTTPParser.REQUEST;
const kOnHeaders = HTTPParser.kOnHeaders | 0;
const kOnHeadersComplete = HTTPParser.kOnHeadersComplete | 0;
const kOnBody = HTTPParser.kOnBody | 0;
const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0;

function processHeaderFragmented(fragments, n) {
const parser = newParser(REQUEST);

bench.start();
for (let i = 0; i < n; i++) {
// Send header in fragments
for (const frag of fragments) {
parser.execute(frag, 0, frag.length);
}
parser.initialize(REQUEST, {});
}
bench.end(n);
}

function newParser(type) {
const parser = new HTTPParser();
parser.initialize(type, {});

parser.headers = [];

parser[kOnHeaders] = function() { };
parser[kOnHeadersComplete] = function() { };
parser[kOnBody] = function() { };
parser[kOnMessageComplete] = function() { };

return parser;
}

// Build the header
let header = `GET /hello HTTP/1.1\r\nContent-Type: text/plain\r\n`;

for (let i = 0; i < len; i++) {
header += `X-Filler${i}: ${Math.random().toString(36).substring(2)}\r\n`;
}
header += '\r\n';

// Split header into fragments
const headerBuf = Buffer.from(header);
const fragSize = Math.ceil(headerBuf.length / frags);
const fragments = [];

for (let i = 0; i < headerBuf.length; i += fragSize) {
fragments.push(headerBuf.slice(i, Math.min(i + fragSize, headerBuf.length)));
}

processHeaderFragmented(fragments, n);
}
140 changes: 91 additions & 49 deletions src/node_http_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,72 +122,116 @@ class BindingData : public BaseObject {
SET_MEMORY_INFO_NAME(BindingData)
};

// helper class for the Parser
struct StringPtr {
StringPtr() {
on_heap_ = false;
Reset();
}
class Parser;

class StringPtrAllocator {
public:
// Memory impact: ~8KB per parser (66 StringPtr × 128 bytes).
static constexpr size_t kSlabSize = 8192;

StringPtrAllocator() = default;

~StringPtr() {
Reset();
// Allocate memory from the slab. Returns nullptr if full.
char* TryAllocate(size_t size) {
if (length_ + size > kSlabSize) {
return nullptr;
}
char* ptr = buffer_ + length_;
length_ += size;
return ptr;
}

// Check if pointer is within this allocator's buffer.
bool Contains(const char* ptr) const {
return ptr >= buffer_ && ptr < buffer_ + kSlabSize;
}

// Reset allocator for new message.
void Reset() { length_ = 0; }

// If str_ does not point to a heap string yet, this function makes it do
private:
char buffer_[kSlabSize];
size_t length_ = 0;
};

struct StringPtr {
StringPtr() = default;
~StringPtr() { Reset(); }

StringPtr(const StringPtr&) = delete;
StringPtr& operator=(const StringPtr&) = delete;

// If str_ does not point to owned storage yet, this function makes it do
// so. This is called at the end of each http_parser_execute() so as not
// to leak references. See issue #2438 and test-http-parser-bad-ref.js.
void Save() {
if (!on_heap_ && size_ > 0) {
char* s = new char[size_];
memcpy(s, str_, size_);
str_ = s;
on_heap_ = true;
void Save(StringPtrAllocator* allocator) {
if (str_ == nullptr || on_heap_ ||
(allocator != nullptr && allocator->Contains(str_))) {
return;
}
// Try allocator first, fall back to heap
if (allocator != nullptr) {
char* ptr = allocator->TryAllocate(size_);
if (ptr != nullptr) {
memcpy(ptr, str_, size_);
str_ = ptr;
return;
}
}
char* s = new char[size_];
memcpy(s, str_, size_);
str_ = s;
on_heap_ = true;
}


void Reset() {
if (on_heap_) {
delete[] str_;
on_heap_ = false;
}

str_ = nullptr;
size_ = 0;
}


void Update(const char* str, size_t size) {
void Update(const char* str, size_t size, StringPtrAllocator* allocator) {
if (str_ == nullptr) {
str_ = str;
} else if (on_heap_ || str_ + size_ != str) {
// Non-consecutive input, make a copy on the heap.
// TODO(bnoordhuis) Use slab allocation, O(n) allocs is bad.
char* s = new char[size_ + size];
memcpy(s, str_, size_);
memcpy(s + size_, str, size);

if (on_heap_)
delete[] str_;
else
on_heap_ = true;
} else if (on_heap_ ||
(allocator != nullptr && allocator->Contains(str_)) ||
str_ + size_ != str) {
// Non-consecutive input, make a copy
const size_t new_size = size_ + size;
char* new_str = nullptr;

// Try allocator first (if not already on heap)
if (!on_heap_ && allocator != nullptr) {
new_str = allocator->TryAllocate(new_size);
}

str_ = s;
if (new_str != nullptr) {
memcpy(new_str, str_, size_);
memcpy(new_str + size_, str, size);
str_ = new_str;
} else {
// Fall back to heap
char* s = new char[new_size];
memcpy(s, str_, size_);
memcpy(s + size_, str, size);
if (on_heap_) delete[] str_;
str_ = s;
on_heap_ = true;
}
}
size_ += size;
}


Local<String> ToString(Environment* env) const {
if (size_ != 0)
return OneByteString(env->isolate(), str_, size_);
else
return String::Empty(env->isolate());
}


// Strip trailing OWS (SPC or HTAB) from string.
Local<String> ToTrimmedString(Environment* env) {
while (size_ > 0 && IsOWS(str_[size_ - 1])) {
Expand All @@ -196,14 +240,11 @@ struct StringPtr {
return ToString(env);
}


const char* str_;
bool on_heap_;
size_t size_;
const char* str_ = nullptr;
bool on_heap_ = false;
size_t size_ = 0;
};

class Parser;

struct ParserComparator {
bool operator()(const Parser* lhs, const Parser* rhs) const;
};
Expand Down Expand Up @@ -259,8 +300,7 @@ class Parser : public AsyncWrap, public StreamListener {
: AsyncWrap(binding_data->env(), wrap),
current_buffer_len_(0),
current_buffer_data_(nullptr),
binding_data_(binding_data) {
}
binding_data_(binding_data) {}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(Parser)
Expand All @@ -278,6 +318,7 @@ class Parser : public AsyncWrap, public StreamListener {
headers_completed_ = false;
chunk_extensions_nread_ = 0;
last_message_start_ = uv_hrtime();
allocator_.Reset();
url_.Reset();
status_message_.Reset();

Expand Down Expand Up @@ -308,7 +349,7 @@ class Parser : public AsyncWrap, public StreamListener {
return rv;
}

url_.Update(at, length);
url_.Update(at, length, &allocator_);
return 0;
}

Expand All @@ -319,7 +360,7 @@ class Parser : public AsyncWrap, public StreamListener {
return rv;
}

status_message_.Update(at, length);
status_message_.Update(at, length, &allocator_);
return 0;
}

Expand All @@ -345,7 +386,7 @@ class Parser : public AsyncWrap, public StreamListener {
CHECK_LT(num_fields_, kMaxHeaderFieldsCount);
CHECK_EQ(num_fields_, num_values_ + 1);

fields_[num_fields_ - 1].Update(at, length);
fields_[num_fields_ - 1].Update(at, length, &allocator_);

return 0;
}
Expand All @@ -366,7 +407,7 @@ class Parser : public AsyncWrap, public StreamListener {
CHECK_LT(num_values_, arraysize(values_));
CHECK_EQ(num_values_, num_fields_);

values_[num_values_ - 1].Update(at, length);
values_[num_values_ - 1].Update(at, length, &allocator_);

return 0;
}
Expand Down Expand Up @@ -594,15 +635,15 @@ class Parser : public AsyncWrap, public StreamListener {
}

void Save() {
url_.Save();
status_message_.Save();
url_.Save(&allocator_);
status_message_.Save(&allocator_);

for (size_t i = 0; i < num_fields_; i++) {
fields_[i].Save();
fields_[i].Save(&allocator_);
}

for (size_t i = 0; i < num_values_; i++) {
values_[i].Save();
values_[i].Save(&allocator_);
}
}

Expand Down Expand Up @@ -1006,6 +1047,7 @@ class Parser : public AsyncWrap, public StreamListener {


llhttp_t parser_;
StringPtrAllocator allocator_; // shared slab for all StringPtrs
StringPtr fields_[kMaxHeaderFieldsCount]; // header fields
StringPtr values_[kMaxHeaderFieldsCount]; // header values
StringPtr url_;
Expand Down
Loading