From e9bb3a7d1899e52fa8077b2993e5a319f68a002d Mon Sep 17 00:00:00 2001 From: lunastera Date: Wed, 2 Sep 2020 15:26:29 +0900 Subject: [PATCH 1/3] Corrected info about elements of the job queue --- gensim/models/word2vec.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/gensim/models/word2vec.py b/gensim/models/word2vec.py index d35a9b4599..06ef7a045c 100755 --- a/gensim/models/word2vec.py +++ b/gensim/models/word2vec.py @@ -1045,10 +1045,10 @@ def _worker_loop(self, job_queue, progress_queue): Parameters ---------- - job_queue : Queue of (list of objects, (str, int)) + job_queue : Queue of (list of objects, float) A queue of jobs still to be processed. The worker will take up jobs from this queue. Each job is represented by a tuple where the first element is the corpus chunk to be processed and - the second is the dictionary of parameters. + the second is the floating-point learning rate. progress_queue : Queue of (int, int, int) A queue of progress reports. Each report is represented as a tuple of these 3 elements: * Size of data chunk processed, for example number of sentences in the corpus chunk. @@ -1064,12 +1064,12 @@ def _worker_loop(self, job_queue, progress_queue): if job is None: progress_queue.put(None) break # no more jobs => quit this worker - data_iterable, job_parameters = job + data_iterable, alpha = job for callback in callbacks: callback.on_batch_begin(self) - tally, raw_tally = self._do_train_job(data_iterable, job_parameters, thread_private_mem) + tally, raw_tally = self._do_train_job(data_iterable, alpha, thread_private_mem) for callback in callbacks: callback.on_batch_end(self) @@ -1088,10 +1088,10 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No ---------- data_iterator : iterable of list of objects The input dataset. This will be split in chunks and these chunks will be pushed to the queue. - job_queue : Queue of (list of object, dict of (str, int)) + job_queue : Queue of (list of object, float) A queue of jobs still to be processed. The worker will take up jobs from this queue. Each job is represented by a tuple where the first element is the corpus chunk to be processed and - the second is the dictionary of parameters. + the second is the floating-point learning rate. cur_epoch : int, optional The current training epoch, needed to compute the training parameters for each job. For example in many implementations the learning rate would be dropping with the number of epochs. @@ -1105,7 +1105,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No """ job_batch, batch_size = [], 0 pushed_words, pushed_examples = 0, 0 - next_job_params = self._get_job_params(cur_epoch) + next_alpha = self._get_current_alpha(cur_epoch) job_no = 0 for data_idx, data in enumerate(data_iterator): @@ -1118,7 +1118,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No batch_size += data_length else: job_no += 1 - job_queue.put((job_batch, next_job_params)) + job_queue.put((job_batch, next_alpha)) # update the learning rate for the next job if total_examples: @@ -1129,14 +1129,14 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No # words-based decay pushed_words += self._raw_word_count(job_batch) epoch_progress = 1.0 * pushed_words / total_words - next_job_params = self._update_job_params(next_job_params, epoch_progress, cur_epoch) + next_alpha = self._update_alpha(epoch_progress, cur_epoch) # add the sentence that didn't fit as the first item of a new job job_batch, batch_size = [data], data_length # add the last job too (may be significantly smaller than batch_words) if job_batch: job_no += 1 - job_queue.put((job_batch, next_job_params)) + job_queue.put((job_batch, next_alpha)) if job_no == 0 and self.train_count == 0: logger.warning( @@ -1160,10 +1160,10 @@ def _log_epoch_progress(self, progress_queue=None, job_queue=None, cur_epoch=0, * size of data chunk processed, for example number of sentences in the corpus chunk. * Effective word count used in training (after ignoring unknown words and trimming the sentence length). * Total word count used in training. - job_queue : Queue of (list of object, dict of (str, int)) + job_queue : Queue of (list of object, float) A queue of jobs still to be processed. The worker will take up jobs from this queue. Each job is represented by a tuple where the first element is the corpus chunk to be processed and - the second is the dictionary of parameters. + the second is the floating-point learning rate. cur_epoch : int, optional The current training epoch, needed to compute the training parameters for each job. For example in many implementations the learning rate would be dropping with the number of epochs. @@ -1342,7 +1342,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, total_wo return trained_word_count, raw_word_count, job_tally - def _get_job_params(self, cur_epoch): + def _get_current_alpha(self, cur_epoch): """Get the learning rate used in the current epoch. Parameters @@ -1359,13 +1359,11 @@ def _get_job_params(self, cur_epoch): alpha = self.alpha - ((self.alpha - self.min_alpha) * float(cur_epoch) / self.epochs) return alpha - def _update_job_params(self, job_params, epoch_progress, cur_epoch): + def _update_alpha(self, epoch_progress, cur_epoch): """Get the correct learning rate for the next iteration. Parameters ---------- - job_params : dict of (str, obj) - UNUSED. epoch_progress : float Ratio of finished work in the current epoch. cur_epoch : int @@ -1476,9 +1474,9 @@ def _log_progress(self, job_queue, progress_queue, cur_epoch, example_count, tot Parameters ---------- - job_queue : Queue of (list of object, dict of (str, float)) + job_queue : Queue of (list of object, float) The queue of jobs still to be performed by workers. Each job is represented as a tuple containing - the batch of data to be processed and the parameters to be used for the processing as a dict. + the batch of data to be processed and the floating-point learning rate. progress_queue : Queue of (int, int, int) A queue of progress reports. Each report is represented as a tuple of these 3 elements: * size of data chunk processed, for example number of sentences in the corpus chunk. From 320cacd26e276ceed64a953b847f1b44f2400f93 Mon Sep 17 00:00:00 2001 From: lunastera Date: Wed, 2 Sep 2020 16:37:19 +0900 Subject: [PATCH 2/3] Add unused args of `_update_alpha` --- gensim/models/word2vec.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gensim/models/word2vec.py b/gensim/models/word2vec.py index 06ef7a045c..7abb409511 100755 --- a/gensim/models/word2vec.py +++ b/gensim/models/word2vec.py @@ -1129,7 +1129,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No # words-based decay pushed_words += self._raw_word_count(job_batch) epoch_progress = 1.0 * pushed_words / total_words - next_alpha = self._update_alpha(epoch_progress, cur_epoch) + next_alpha = self._update_alpha(next_alpha, epoch_progress, cur_epoch) # add the sentence that didn't fit as the first item of a new job job_batch, batch_size = [data], data_length @@ -1359,11 +1359,13 @@ def _get_current_alpha(self, cur_epoch): alpha = self.alpha - ((self.alpha - self.min_alpha) * float(cur_epoch) / self.epochs) return alpha - def _update_alpha(self, epoch_progress, cur_epoch): + def _update_alpha(self, alpha, epoch_progress, cur_epoch): """Get the correct learning rate for the next iteration. Parameters ---------- + alpha : float + UNUSED. epoch_progress : float Ratio of finished work in the current epoch. cur_epoch : int From 63f977a216d26b3acfb1e7c709ac5d4d5ae3edc2 Mon Sep 17 00:00:00 2001 From: lunastera Date: Fri, 4 Sep 2020 12:36:00 +0900 Subject: [PATCH 3/3] Integrate what is essentially the same process --- gensim/models/word2vec.py | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/gensim/models/word2vec.py b/gensim/models/word2vec.py index 7abb409511..5a74796c3c 100755 --- a/gensim/models/word2vec.py +++ b/gensim/models/word2vec.py @@ -1105,7 +1105,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No """ job_batch, batch_size = [], 0 pushed_words, pushed_examples = 0, 0 - next_alpha = self._get_current_alpha(cur_epoch) + next_alpha = self._get_next_alpha(0.0, cur_epoch) job_no = 0 for data_idx, data in enumerate(data_iterator): @@ -1129,7 +1129,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No # words-based decay pushed_words += self._raw_word_count(job_batch) epoch_progress = 1.0 * pushed_words / total_words - next_alpha = self._update_alpha(next_alpha, epoch_progress, cur_epoch) + next_alpha = self._get_next_alpha(epoch_progress, cur_epoch) # add the sentence that didn't fit as the first item of a new job job_batch, batch_size = [data], data_length @@ -1342,30 +1342,11 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, total_wo return trained_word_count, raw_word_count, job_tally - def _get_current_alpha(self, cur_epoch): - """Get the learning rate used in the current epoch. - - Parameters - ---------- - cur_epoch : int - Current iteration through the corpus - - Returns - ------- - float - The learning rate for this epoch (it is linearly reduced with epochs from `self.alpha` to `self.min_alpha`). - - """ - alpha = self.alpha - ((self.alpha - self.min_alpha) * float(cur_epoch) / self.epochs) - return alpha - - def _update_alpha(self, alpha, epoch_progress, cur_epoch): + def _get_next_alpha(self, epoch_progress, cur_epoch): """Get the correct learning rate for the next iteration. Parameters ---------- - alpha : float - UNUSED. epoch_progress : float Ratio of finished work in the current epoch. cur_epoch : int