Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

"OpenBLAS warning: precompiled NUM_THREADS exceeded" and "resource_tracker: There appear to be ## leaked semaphore objects to clean up at shutdown" #119

Open
carusyte opened this issue Dec 27, 2024 · 0 comments

Comments

@carusyte
Copy link

After 1-2 iterations, the program is terminated with the following warnings:

OpenBLAS warning: precompiled NUM_THREADS exceeded, adding auxiliary array for thread metadata.
To avoid this warning, please rebuild your copy of OpenBLAS with a larger NUM_THREADS setting
or set the environment variable OPENBLAS_NUM_THREADS to 64 or lower double free or corruption (out)
/home/kemove/.pyenv/versions/3.12.2/lib/python3.12/multiprocessing/resource_tracker.py:254: UserWarning: resource_tracker: There appear to be 96 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '

I understand that I could perhaps set OPENBLAS_NUM_THREADS=1 before the program starts to sidestep the issue as some google results indicated, but it also slows down the entire process.

Here's my code:

@scheduler.custom(n_jobs=n_jobs)
def objective(params_batch):
    jobs = []
    t1 = time.time()
    nworker = num_workers(False)
    client.set_metadata(["workload_info", "total"], len(params_batch))
    client.set_metadata(["workload_info", "workers"], nworker)
    client.set_metadata(["workload_info", "finished"], 0)
    for i, params in enumerate(params_batch):
        new_df = df
        hpid, _ = get_hpid(params)
        priority = 1
        if model:
            priority = (
                priority + 1 if model.trainable_on_cpu(**params) else priority
            )
        if "topk_covar" in params:
            if "covar_dist" in params:
                new_df = select_randk_covars(
                    df, ranked_features, params["covar_dist"], params["topk_covar"]
                )
            else:
                new_df = select_topk_features(
                    df, ranked_features, params["topk_covar"]
                )
        future = client.submit(
            validate_hyperparams,
            args,
            new_df,
            covar_set_id,
            hps_id,
            params,
            resources={"POWER": power_demand(args, params)},
            retries=1,
            locks=locks,
            key=f"{validate_hyperparams.__name__}-{hpid}",
            priority=priority,
        )
        future.add_done_callback(hps_task_callback)
        jobs.append(future)
        if i < nworker:
            interval = random.randint(1000, 5000) / 1000.0
            time.sleep(interval)
    results = client.gather(jobs, errors="skip")
    elapsed = round(time.time() - t1, 3)
    # logger.info("gathered results type %s, len: %s", type(results), len(results))
    # logger.info("gathered results: %s", results)
    results = [(p, l) for p, l in results if p is not None and l is not None]
    if len(results) == 0:
        logger.warning(
            "Results not available at this iteration. Elapsed: %s", elapsed
        )
        return [], []
    params, loss = zip(*results)
    params = list(params)
    loss = list(loss)
    logger.info("Elapsed: %s, Successful results: %s", elapsed, len(results))
    # restart client here to free up memory
    if args.restart_workers:
        client.restart()
    return params, loss

warmstart_tuples = None
if resume:
    warmstart_tuples = preload_warmstart_tuples(
        args.model,
        args.symbol,
        covar_set_id,
        hps_id,
        args.batch_size * iteration * 2,
        len(ranked_features),
    )
if warmstart_tuples is not None:
    logger.info(
        "preloaded %s historical searched hyper-params for warm-start.",
        len(warmstart_tuples),
    )
else:
    logger.info("no available historical data for warm-start")
tuner = Tuner(
    space,
    objective,
    dict(
        initial_random=n_jobs,
        batch_size=n_jobs,
        num_iteration=iteration,
        initial_custom=warmstart_tuples,
        domain_size=domain_size,
    ),
)
results = tuner.minimize()
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant