diff --git a/base/base.pro b/base/base.pro index 6071d8da79..005806318d 100644 --- a/base/base.pro +++ b/base/base.pro @@ -9,74 +9,76 @@ include($$ROOT_DIR/common.pri) SOURCES += \ base.cpp \ - logging.cpp \ - thread.cpp \ - string_utils.cpp \ commands_queue.cpp \ - shared_buffer_manager.cpp \ condition.cpp \ + exception.cpp \ + fence_manager.cpp \ + internal/message.cpp \ + logging.cpp \ lower_case.cpp \ normalize_unicode.cpp \ - runner.cpp \ - timer.cpp \ - internal/message.cpp \ - exception.cpp \ - threaded_container.cpp \ - resource_pool.cpp \ - fence_manager.cpp \ - strings_bundle.cpp \ - string_format.cpp \ object_tracker.cpp \ - scheduled_task.cpp \ - thread_pool.cpp \ pseudo_random.cpp \ - src_point.cpp + resource_pool.cpp \ + runner.cpp \ + scheduled_task.cpp \ + shared_buffer_manager.cpp \ + src_point.cpp \ + string_format.cpp \ + string_utils.cpp \ + strings_bundle.cpp \ + thread.cpp \ + thread_pool.cpp \ + threaded_container.cpp \ + timer.cpp \ HEADERS += \ SRC_FIRST.hpp \ + array_adapters.hpp \ assert.hpp \ - const_helper.hpp \ - internal/fast_mutex.hpp \ - math.hpp \ - pseudo_random.hpp \ - scope_guard.hpp \ - macros.hpp \ base.hpp \ - src_point.hpp \ bits.hpp \ + buffer_vector.hpp \ + cache.hpp \ + commands_queue.hpp \ + condition.hpp \ + const_helper.hpp \ exception.hpp \ - internal/message.hpp \ + fence_manager.hpp \ internal/fast_mutex.hpp \ + internal/fast_mutex.hpp \ + internal/message.hpp \ + limited_priority_queue.hpp \ logging.hpp \ + macros.hpp \ + math.hpp \ + matrix.hpp \ + mem_trie.hpp \ + mru_cache.hpp \ + mutex.hpp \ + object_tracker.hpp \ + pseudo_random.hpp \ + regexp.hpp \ + resource_pool.hpp \ + rolling_hash.hpp \ + runner.hpp \ + scheduled_task.hpp \ + scope_guard.hpp \ + set_operations.hpp \ + shared_buffer_manager.hpp \ + src_point.hpp \ + stats.hpp \ + std_serialization.hpp \ + stl_add.hpp \ + stl_iterator.hpp \ + string_format.hpp \ + string_utils.hpp \ + strings_bundle.hpp \ swap.hpp \ thread.hpp \ - mutex.hpp \ - string_utils.hpp \ - rolling_hash.hpp \ - stl_add.hpp \ - timer.hpp \ - cache.hpp \ - matrix.hpp \ - set_operations.hpp \ - condition.hpp \ - commands_queue.hpp \ - stats.hpp \ - shared_buffer_manager.hpp \ - buffer_vector.hpp \ - array_adapters.hpp \ - runner.hpp \ - mru_cache.hpp \ + thread_pool.hpp \ threaded_container.hpp \ threaded_list.hpp \ - resource_pool.hpp \ - limited_priority_queue.hpp \ threaded_priority_queue.hpp \ - std_serialization.hpp \ - fence_manager.hpp \ - strings_bundle.hpp \ - string_format.hpp \ - object_tracker.hpp \ - regexp.hpp \ - scheduled_task.hpp \ - thread_pool.hpp \ - stl_iterator.hpp \ + timer.hpp \ + worker_thread.hpp \ diff --git a/base/base_tests/base_tests.pro b/base/base_tests/base_tests.pro index 25cb544cb3..b1e8ae8a89 100644 --- a/base/base_tests/base_tests.pro +++ b/base/base_tests/base_tests.pro @@ -14,32 +14,31 @@ win32-g++: LIBS += -lpthread SOURCES += \ ../../testing/testingmain.cpp \ - const_helper.cpp \ - math_test.cpp \ - scope_guard_test.cpp \ - bits_test.cpp \ - logging_test.cpp \ - threads_test.cpp \ - rolling_hash_test.cpp \ - cache_test.cpp \ - stl_add_test.cpp \ - string_utils_test.cpp \ - matrix_test.cpp \ - commands_queue_test.cpp \ - buffer_vector_test.cpp \ assert_test.cpp \ - timer_test.cpp \ - mru_cache_test.cpp \ - threaded_list_test.cpp \ + bits_test.cpp \ + buffer_vector_test.cpp \ + cache_test.cpp \ + commands_queue_test.cpp \ condition_test.cpp \ + const_helper.cpp \ containers_test.cpp \ fence_manager_test.cpp \ - string_format_test.cpp \ + logging_test.cpp \ + math_test.cpp \ + matrix_test.cpp \ + mem_trie_test.cpp \ + mru_cache_test.cpp \ regexp_test.cpp \ + rolling_hash_test.cpp \ scheduled_task_test.cpp \ - thread_pool_tests.cpp + scope_guard_test.cpp \ + stl_add_test.cpp \ + string_format_test.cpp \ + string_utils_test.cpp \ + thread_pool_tests.cpp \ + threaded_list_test.cpp \ + threads_test.cpp \ + timer_test.cpp \ + worker_thread_test.cpp \ HEADERS += - - - diff --git a/base/base_tests/mem_trie_test.cpp b/base/base_tests/mem_trie_test.cpp new file mode 100644 index 0000000000..d4634dd950 --- /dev/null +++ b/base/base_tests/mem_trie_test.cpp @@ -0,0 +1,31 @@ +#include "../../testing/testing.hpp" + +#include "../mem_trie.hpp" + +#include "../../std/algorithm.hpp" +#include "../../std/string.hpp" +#include "../../std/utility.hpp" +#include "../../std/vector.hpp" + +UNIT_TEST(MemTrie_Basic) +{ + vector> data = {{"roger", 3}, + {"amy", 1}, + {"emma", 1}, + {"ann", 1}, + {"rob", 1}, + {"roger", 2}, + {"", 0}, + {"roger", 1}}; + my::MemTrie trie; + for (auto const & p : data) + trie.Add(p.first, p.second); + + vector> ordered_data; + trie.ForEach([&ordered_data](string const & k, int v) + { + ordered_data.emplace_back(k, v); + }); + sort(data.begin(), data.end()); + TEST_EQUAL(data, ordered_data, ()); +} diff --git a/base/base_tests/worker_thread_test.cpp b/base/base_tests/worker_thread_test.cpp new file mode 100644 index 0000000000..51f800b21c --- /dev/null +++ b/base/base_tests/worker_thread_test.cpp @@ -0,0 +1,31 @@ +#include "../../testing/testing.hpp" + +#include "../worker_thread.hpp" + +#include "../../std/memory.hpp" +#include "../../std/vector.hpp" + +struct Task +{ + Task(vector & buffer, int index) : m_buffer(buffer), m_index(index) {} + + void operator()() const { m_buffer.push_back(m_index); } + + vector & m_buffer; + int m_index; +}; + +UNIT_TEST(WorkerThread_Basic) +{ + my::WorkerThread thread(5 /* maxTasks */); + + vector buffer; + + for (int i = 0; i < 10; ++i) + thread.Push(make_shared(buffer, i)); + thread.RunUntilIdleAndStop(); + + TEST_EQUAL(static_cast(10), buffer.size(), ()); + for (size_t i = 0; i < buffer.size(); ++i) + TEST_EQUAL(static_cast(i), buffer[i], ()); +} diff --git a/base/macros.hpp b/base/macros.hpp index 573cae3bab..858b547c58 100644 --- a/base/macros.hpp +++ b/base/macros.hpp @@ -66,4 +66,9 @@ namespace my #define UINT64_LO(x) (static_cast(x & 0xFFFFFFFF)) #define UINT64_HI(x) (static_cast(x >> 32)) - +#define DISALLOW_COPY_AND_MOVE(className) \ +private: \ + className(const className &) = delete; \ + className(className &&) = delete; \ + className & operator=(const className &) = delete; \ + className & operator=(className &&) = delete; diff --git a/base/mem_trie.hpp b/base/mem_trie.hpp new file mode 100644 index 0000000000..27c3c4cc90 --- /dev/null +++ b/base/mem_trie.hpp @@ -0,0 +1,91 @@ +#pragma once + +#include "macros.hpp" + +#include "../std/map.hpp" +#include "../std/vector.hpp" + +namespace my +{ +/// This class is a simple in-memory trie which allows to add +/// key-value pairs and then traverse them in a sorted order. +template +class MemTrie +{ +public: + MemTrie() = default; + + /// Adds a key-value pair to the trie. + void Add(StringT const & key, ValueT const & value) + { + Node * cur = &m_root; + for (auto const & c : key) + cur = cur->GetMove(c); + cur->AddValue(value); + } + + /// Traverses all key-value pairs in the trie in a sorted order. + /// + /// \param toDo A callable object that will be called on an each + /// key-value pair. + template + void ForEach(ToDo const & toDo) + { + StringT prefix; + ForEach(&m_root, prefix, toDo); + } + +private: + struct Node + { + using CharT = typename StringT::value_type; + using MovesMap = map; + + Node() = default; + + ~Node() + { + for (auto const & move : m_moves) + delete move.second; + } + + Node * GetMove(CharT const & c) + { + Node ** node = &m_moves[c]; + if (*node) + return *node; + *node = new Node(); + return *node; + } + + void AddValue(const ValueT & value) { m_values.push_back(value); } + + MovesMap m_moves; + vector m_values; + + DISALLOW_COPY_AND_MOVE(Node); + }; + + template + void ForEach(Node * root, StringT & prefix, ToDo const & toDo) + { + if (!root->m_values.empty()) + { + sort(root->m_values.begin(), root->m_values.end()); + for (auto const & value : root->m_values) + toDo(prefix, value); + } + + for (auto const & move : root->m_moves) + { + prefix.push_back(move.first); + ForEach(move.second, prefix, toDo); + prefix.pop_back(); + } + } + + Node m_root; + + DISALLOW_COPY_AND_MOVE(MemTrie); +}; // class MemTrie +} // namespace my diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp new file mode 100644 index 0000000000..220d398b8a --- /dev/null +++ b/base/worker_thread.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include "macros.hpp" + +#include "../std/condition_variable.hpp" +#include "../std/memory.hpp" +#include "../std/mutex.hpp" +#include "../std/queue.hpp" +#include "../std/thread.hpp" + +namespace my +{ +/// This class wraps a sequential worker thread, that performs tasks +/// one-by-one. This class is not thread-safe, so, it should be +/// instantiated, used and destroyed on the same thread. +template +class WorkerThread +{ +public: + WorkerThread(int maxTasks) + : m_maxTasks(maxTasks), m_shouldFinish(false), m_workerThread(&WorkerThread::Worker, this) + { + } + + ~WorkerThread() { CHECK(!m_workerThread.joinable(), ()); } + + /// Pushes new task into worker thread's queue. If the queue is + /// full, current thread is blocked. + /// + /// \param task A callable object that will be called by worker thread. + void Push(shared_ptr task) + { + CHECK(m_workerThread.joinable(), ()); + unique_lock lock(m_mutex); + m_condNotFull.wait(lock, [this]() + { + return m_tasks.size() < m_maxTasks; + }); + m_tasks.push(task); + m_condNonEmpty.notify_one(); + } + + /// Runs worker thread until it'll become idle. After that, + /// terminates worker thread. + void RunUntilIdleAndStop() + { + { + lock_guard lock(m_mutex); + m_shouldFinish = true; + m_condNonEmpty.notify_one(); + } + m_workerThread.join(); + } + +private: + void Worker() + { + shared_ptr task; + while (true) + { + { + unique_lock lock(m_mutex); + m_condNonEmpty.wait(lock, [this]() + { + return m_shouldFinish || !m_tasks.empty(); + }); + if (m_shouldFinish && m_tasks.empty()) + break; + task = m_tasks.front(); + m_tasks.pop(); + m_condNotFull.notify_one(); + } + (*task)(); + } + } + + /// Maximum number of tasks in the queue. + int const m_maxTasks; + queue> m_tasks; + + /// When true, worker thread should finish all tasks in the queue + /// and terminate. + bool m_shouldFinish; + + mutex m_mutex; + condition_variable m_condNotFull; + condition_variable m_condNonEmpty; + thread m_workerThread; + + DISALLOW_COPY_AND_MOVE(WorkerThread); +}; // class WorkerThread +} // namespace my diff --git a/indexer/string_file.cpp b/indexer/string_file.cpp index 1d3c976c07..6bfb1ee8c7 100644 --- a/indexer/string_file.cpp +++ b/indexer/string_file.cpp @@ -1,11 +1,5 @@ #include "string_file.hpp" -#include "../coding/read_write_utils.hpp" -#include "../coding/file_reader.hpp" -#include "../coding/file_writer.hpp" - -#include "../base/logging.hpp" - #include "../std/algorithm.hpp" #include "../std/bind.hpp" @@ -79,26 +73,16 @@ void StringsFile::IteratorT::increment() } StringsFile::StringsFile(string const & fPath) + : m_workerThread(1 /* maxTasks */) { m_writer.reset(new FileWriter(fPath)); } void StringsFile::Flush() { - // store starting offset - uint64_t const pos = m_writer->Pos(); - m_offsets.push_back(make_pair(pos, pos)); - - // sort strings - sort(m_strings.begin(), m_strings.end()); - - // write strings to file - for_each(m_strings.begin(), m_strings.end(), bind(&StringT::Write, _1, ref(*m_writer))); - - // store end offset - m_offsets.back().second = m_writer->Pos(); - - m_strings.clear(); + shared_ptr task( + new SortAndDumpStringsTask(*m_writer, m_offsets, m_strings)); + m_workerThread.Push(task); } bool StringsFile::PushNextValue(size_t i) @@ -127,6 +111,8 @@ void StringsFile::EndAdding() { Flush(); + m_workerThread.RunUntilIdleAndStop(); + m_writer->Flush(); } diff --git a/indexer/string_file.hpp b/indexer/string_file.hpp index b6904a6683..bdcd772231 100644 --- a/indexer/string_file.hpp +++ b/indexer/string_file.hpp @@ -3,8 +3,12 @@ #include "../coding/file_writer.hpp" #include "../coding/file_reader.hpp" +#include "../base/macros.hpp" +#include "../base/mem_trie.hpp" #include "../base/string_utils.hpp" +#include "../base/worker_thread.hpp" +#include "../coding/read_write_utils.hpp" #include "../std/iterator_facade.hpp" #include "../std/queue.hpp" #include "../std/functional.hpp" @@ -37,6 +41,8 @@ public: strings::UniString const & GetString() const { return m_name; } + ValueT const & GetValue() const { return m_val; } + template void SerializeValue(TCont & cont) const { cont.assign(m_val.begin(), m_val.end()); @@ -55,6 +61,58 @@ public: } }; + /// This class encapsulates a task to efficiently sort a bunch of + /// strings and writes them in a sorted oreder. + class SortAndDumpStringsTask + { + public: + /// A class ctor. + /// + /// \param writer A writer that will be used to write strings. + /// \param offsets A list of offsets [begin, end) that denote + /// groups of sorted strings in a file. When strings will be + /// sorted and dumped on a disk, a pair of offsets will be added + /// to the list. + /// \param strings Vector of strings that should be sorted. Internal data is moved out from + /// strings, so it'll become empty after ctor. + SortAndDumpStringsTask(FileWriter & writer, vector> & offsets, + std::vector & strings) + : m_writer(writer), m_offsets(offsets) + { + strings.swap(m_strings); + } + + /// Sorts strings via in-memory trie and writes them. + void operator()() + { + vector memBuffer; + { + my::MemTrie trie; + for (auto const & s : m_strings) + trie.Add(s.GetString(), s.GetValue()); + MemWriter> memWriter(memBuffer); + trie.ForEach([&memWriter](const strings::UniString & s, const ValueT & v) + { + rw::Write(memWriter, s); + rw::WriteVectorOfPOD(memWriter, v); + }); + } + + uint64_t const spos = m_writer.Pos(); + m_writer.Write(&memBuffer[0], memBuffer.size()); + uint64_t const epos = m_writer.Pos(); + m_offsets.push_back(make_pair(spos, epos)); + m_writer.Flush(); + } + + private: + FileWriter & m_writer; + vector> & m_offsets; + vector m_strings; + + DISALLOW_COPY_AND_MOVE(SortAndDumpStringsTask); + }; + class IteratorT : public iterator_facade { StringsFile & m_file; @@ -96,9 +154,16 @@ private: bool PushNextValue(size_t i); vector m_strings; - // store start and end offsets of file portions + + // Contains start and end offsets of file portions. vector > m_offsets; + // A worker thread that sorts and writes groups of strings. The + // whole process looks like a pipeline, i.e. main thread accumulates + // strings while worker thread sequentially sorts and stores groups + // of strings on a disk. + my::WorkerThread m_workerThread; + struct QValue { StringT m_string; diff --git a/std/memory.hpp b/std/memory.hpp new file mode 100644 index 0000000000..c0edfa8943 --- /dev/null +++ b/std/memory.hpp @@ -0,0 +1,15 @@ +#pragma once +#include "common_defines.hpp" + +#ifdef new +#undef new +#endif + +#include + +using std::make_shared; +using std::shared_ptr; + +#ifdef DEBUG_NEW +#define new DEBUG_NEW +#endif diff --git a/std/mutex.hpp b/std/mutex.hpp index 161b168998..0f024c0647 100644 --- a/std/mutex.hpp +++ b/std/mutex.hpp @@ -7,7 +7,9 @@ #include +using std::lock_guard; using std::mutex; +using std::unique_lock; #ifdef DEBUG_NEW #define new DEBUG_NEW diff --git a/std/thread.hpp b/std/thread.hpp new file mode 100644 index 0000000000..0e9cf61426 --- /dev/null +++ b/std/thread.hpp @@ -0,0 +1,14 @@ +#pragma once +#include "common_defines.hpp" + +#ifdef new +#undef new +#endif + +#include + +using std::thread; + +#ifdef DEBUG_NEW +#define new DEBUG_NEW +#endif