Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Detach" cluster from Jupyter kernel #629

Open
lukas-koschmieder opened this issue Nov 9, 2021 · 6 comments
Open

"Detach" cluster from Jupyter kernel #629

lukas-koschmieder opened this issue Nov 9, 2021 · 6 comments

Comments

@lukas-koschmieder
Copy link

When I start a cluster from a Jupyter notebook and then shut down the notebook, the cluster will also shut down. Is there a way to detach a cluster from a notebook kernel?

@lukas-koschmieder
Copy link
Author

Background: I would like to use ipyparallel for a long-term parametric study (2w). The basic idea is to

  1. start n engines from a Jupyter notebook,
  2. schedule m>>n jobs (via DirectView.map()?),
  3. leave/shutdown notebook,
  4. eventually return/restart notebook to post-process results.

Would this be feasible?

@minrk
Copy link
Member

minrk commented Nov 9, 2021

Yes! The shutdown_atexit attribute governs whether the cluster should be shutdown during Python's atexit process teardown stage:

c = ipp.Cluster(shutdown_atexit=False)

Or set the attribute at any time later:

cluster.shutdown_atexit = False

This is how the ipcluster start --daemonize command works.

Here's one way to initialize a notebook or script that will connect to an existing cluster, starting it if doesn't exist yet:

import ipyparallel as ipp

# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"

try:
    cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
    print("Connected to existing cluster")
except FileNotFoundError:
    print("Starting new cluster")
    cluster = ipp.Cluster(
        cluster_id=cluster_id,
        n=4,
        # leave the cluster running when this process exits
        shutdown_atexit=False,
    )
    cluster.start_cluster_sync()

# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)

@lukas-koschmieder
Copy link
Author

Thanks, this was very helpful! 👍

I‘m using ipyparallel.client.view.DirectView.map() to run code on my cluster. Is it possible somehow to restore the ipyparallel.client.asyncresult.AsyncMapResult object as well when you reconnect to an existing cluster (in case you do not have a shared/persistent file system for the results)?

@minrk
Copy link
Member

minrk commented Nov 11, 2021

You can use the result database on the Hub for this (if you don't disable it and it doesn't fill up). It's not the most robust API, though.

If you save the msg_ids, you can construct an AsyncHubResult with client.get_result(msg_ids). It won't do the partition-reconstruction of an AsyncMapResult, but you will be able to get all the results. The main difference is that each element in the AsyncHubResult will be a chunk of results (e.g. list of lists) rather than the flat result list.

This workflow could be a lot better, but this works, adding onto the above script:

import json
import os
import time

import ipyparallel as ipp

# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"

try:
    cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
    print("Connected to existing cluster")
except FileNotFoundError:
    print("Starting new cluster")
    cluster = ipp.Cluster(
        cluster_id=cluster_id,
        n=4,
        # leave the cluster running when this process exits
        shutdown_atexit=False,
    )
    cluster.start_cluster_sync()

# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)

def task_function(i):
    import time  # noqa
    time.sleep(i)
    return i

task_file = "task.json"

if not os.path.isfile(task_file):
    # first run, submit the job
    print("submitting new job!")
    amr = rc[:].map_async(task_function, range(10))
    print(f"Saving task file to {task_file}: {amr.msg_ids}")
    with open(task_file, "w") as f:
        json.dump(amr.msg_ids, f)
    print("Interrupt me to stop waiting!")
    amr.wait_interactive()

else:
    # second run, wait for results
    print(f"Loading task from {task_file}")
    with open(task_file) as f:
        msg_ids = json.load(f)
    # get async hub result from the Hub
    ar = rc.get_result(msg_ids)
    # a reconstructed result will have as each element
    # a _chunk_ of results, not individual results, so nest iteration
    for result_chunk in ar:
        for result in result_chunk:
            print(result)
    # could also do results = itertools.chain(*list(ar))

@lukas-koschmieder
Copy link
Author

Thank you for the solution and the warning.

if ... it doesn't fill up

What kind of database does it use? How much data can it store?

@minrk
Copy link
Member

minrk commented Nov 12, 2021

docs are here. It's in-memory by default, and the default limit to start culling records is 1024 tasks or 1GB of results, whichever comes first. These can be configured with c.DictDB.record_limit and c.DictDB.size_limit, respectively. If you use an sqlite database, there is no limit (the task db will grow forever unless you explicitly delete it).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants