Source code for coffea_casa.coffea_casa

"""CoffeaCasaCluster class"""
import os
from pathlib import Path
import sys
import socket
import dask
from dask_jobqueue.htcondor import HTCondorCluster, HTCondorJob
from distributed.security import Security

# Port settings
DEFAULT_SCHEDULER_PORT = 8786
DEFAULT_DASHBOARD_PORT = 8785
DEFAULT_CONTAINER_PORT = 8786
DEFAULT_NANNY_PORT = 8001

# Security settings for Dask scheduler
SECRETS_DIR = Path("/etc/cmsaf-secrets")
SECRETS_DIR_CHOWN = Path("/etc/cmsaf-secrets-chown")
CA_FILE = SECRETS_DIR / "ca.pem"
CERT_FILE = SECRETS_DIR / "hostcert.pem"
KEY_FILE = SECRETS_DIR / "hostkey.pem"
HOME_DIR = Path.home()
PIP_REQUIREMENTS = HOME_DIR / "requirements.txt"

# conda, with yml/yaml both supported
if (HOME_DIR / "environment.yaml").is_file():
    CONDA_ENV = HOME_DIR / "environment.yaml"
else:
    CONDA_ENV = HOME_DIR / "environment.yml"


def bearer_token_path():
    """Return the path to the bearer token or None if not found"""
    def check_token_path(path, suffix=''):
        token_path = f'{path}{suffix}'
        if Path(token_path).is_file():
            return token_path
        return None

    # 1. Check BEARER_TOKEN_FILE env variable
    try:
        path = check_token_path(os.environ['BEARER_TOKEN_FILE'])
        if path:
            return path
    except KeyError:
        pass

    # 2. Check /etc/cmsaf-secrets-chown/access_token (CMS AF mounted token)
    path = check_token_path('/etc/cmsaf-secrets-chown/access_token')
    if path:
        return path

    # 3. Check XDG_RUNTIME_DIR + /bt_u$UID
    try:
        xdg_runtime_dir = os.environ['XDG_RUNTIME_DIR']
        path = check_token_path(xdg_runtime_dir, suffix=f'/bt_u{os.geteuid()}')
        if path:
            return path
    except KeyError:
        pass

    # 4. Check /tmp/bt_u$UID
    path = check_token_path(f'/tmp/bt_u{os.geteuid()}')
    if path:
        return path

    return None


def x509_user_proxy_path():
    """Return the path to the user's X.509 proxy or None if not found"""
    try:
        path = os.environ['X509_USER_PROXY']
    except KeyError:
        path = f'/tmp/x509up_u{os.geteuid()}'

    if Path(path).is_file():
        return path
    return None


def merge_dicts(*dict_args):
    """Merge dictionaries, precedence goes to latter dictionaries"""
    result = {}
    for dictionary in dict_args:
        result.update(dictionary)
    return result


class CoffeaCasaJob(HTCondorJob):
    submit_command = "condor_submit -spool"
    config_name = "coffea-casa"


[docs] class CoffeaCasaCluster(HTCondorCluster): """ Subclass for launching Dask via HTCondor in CMS facilities. """ job_cls = CoffeaCasaJob config_name = "coffea-casa" def __init__(self, *, security=None, force_tcp=False, worker_image=None, scheduler_options=None, scheduler_port=DEFAULT_SCHEDULER_PORT, dashboard_port=DEFAULT_DASHBOARD_PORT, nanny_port=DEFAULT_NANNY_PORT, check_ports=False, **job_kwargs): """ Parameters ---------- security : distributed.Security, optional Security object for TLS configuration force_tcp : bool, default False Force TCP instead of TLS worker_image : str, optional Docker image for workers scheduler_port : int, default 8786 Scheduler port dashboard_port : int, default 8785 Dashboard port nanny_port : int, default 8001 Nanny port check_ports : bool, default False Check if ports are available before starting **job_kwargs Additional job configuration """ self._force_tcp = force_tcp # FIX 1: Sanitize dashboard_address boolean from Labextension # The Labextension can inject dashboard_address=True (a boolean) into # dask config, which causes format_dashboard_link() to crash with: # AttributeError: 'bool' object has no attribute 'format' raw_dashboard_link = dask.config.get("distributed.dashboard.link", None) if isinstance(raw_dashboard_link, bool): dask.config.set({"distributed.dashboard.link": "http://{host}:{port}/status"}) # FIX 2: Patch worker TLS config in dask.yaml if it's null # Some dask.yaml files have: # distributed.comm.tls.worker.cert: null # distributed.comm.tls.worker.key: null # This causes ssl_context=None at worker bind time if not force_tcp: worker_cert = dask.config.get("distributed.comm.tls.worker.cert", None) worker_key = dask.config.get("distributed.comm.tls.worker.key", None) if not worker_cert or not worker_key: dask.config.set({ "distributed.comm.tls.worker.cert": str(CERT_FILE), "distributed.comm.tls.worker.key": str(KEY_FILE if KEY_FILE.is_file() else CERT_FILE), }) # FIX 3: Align dask.config require-encryption with actual TLS state # dask.yaml may have require-encryption: true even when force_tcp=True # SpecCluster reads this independently and refuses tcp:// connections will_use_tls = ( not force_tcp and CA_FILE.is_file() and CERT_FILE.is_file() and security_obj().get_connection_args("scheduler").get("require_encryption", False) ) dask.config.set({"distributed.comm.require-encryption": will_use_tls}) # FIX 4: Optional port conflict check (disabled by default) if check_ports: for port in (scheduler_port, dashboard_port, nanny_port): if port: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind(("0.0.0.0", port)) except OSError: raise RuntimeError(f"Port {port} already in use.") if security: self.security = security job_kwargs = self._modify_job_kwargs( job_kwargs, security=security, force_tcp=force_tcp, worker_image=worker_image, scheduler_port=scheduler_port, dashboard_port=dashboard_port, nanny_port=nanny_port, ) # Prevent dask-jobqueue from creating a local worker # We only want HTCondor workers, not local workers job_kwargs['n_workers'] = 0 super().__init__(**job_kwargs) @classmethod def _modify_job_kwargs(cls, job_kwargs, *, security=None, force_tcp=False, worker_image=None, scheduler_options=None, scheduler_port=DEFAULT_SCHEDULER_PORT, dashboard_port=DEFAULT_DASHBOARD_PORT, nanny_port=DEFAULT_NANNY_PORT): job_config = job_kwargs.copy() input_files = [] if PIP_REQUIREMENTS.is_file(): input_files.append(PIP_REQUIREMENTS) if CONDA_ENV.is_file(): input_files.append(CONDA_ENV) # If we have certs and not forcing TCP, use TLS if (not force_tcp and CA_FILE.is_file() and CERT_FILE.is_file() and security_obj().get_connection_args("scheduler")["require_encryption"]): job_config["protocol"] = "tls://" job_config["security"] = security_obj() input_files += [CA_FILE, CERT_FILE] # Add separate key file if it exists if KEY_FILE.is_file() and KEY_FILE != CERT_FILE: input_files.append(KEY_FILE) else: job_config["protocol"] = "tcp://" # Add bearer token if found token_file = bearer_token_path() if token_file: input_files.append(token_file) files = ", ".join(str(path) for path in input_files) # Networking settings try: external_ip = os.environ.get("POD_IP") or os.environ.get("HOST_IP") or socket.getfqdn() except Exception: print("Warning: Could not determine external IP") external_ip = "127.0.0.1" scheduler_protocol = job_config["protocol"] address_list = [external_ip, str(scheduler_port)] external_address_short = ":".join(address_list) full_address_list = [scheduler_protocol, external_address_short] contact_address = "".join(full_address_list) external_ip_string = f'"{contact_address}"' dash_port = f":{dashboard_port}" # Set dashboard link for Labextension # Priority: DASK_DASHBOARD_LINK env > JupyterHub proxy > direct access dashboard_link = os.environ.get("DASK_DASHBOARD_LINK") if not dashboard_link: # Check if running in JupyterHub (has JUPYTERHUB_SERVICE_PREFIX) jupyterhub_prefix = os.environ.get("JUPYTERHUB_SERVICE_PREFIX") if jupyterhub_prefix: # JupyterHub proxy pattern: /user/<username>/proxy/<port>/status # JUPYTERHUB_SERVICE_PREFIX typically ends with / (e.g., "/user/name/") # Ensure it ends with / before appending if not jupyterhub_prefix.endswith('/'): jupyterhub_prefix += '/' dashboard_link = f"{jupyterhub_prefix}proxy/{dashboard_port}/status" else: # Try to construct direct access URL external_hostname = os.environ.get("EXTERNAL_HOSTNAME") if external_hostname: # Direct access with hostname (no port in URL for https) dashboard_link = f"https://{external_hostname}/status" else: # Fall back to pod IP (may not be accessible from browser) dashboard_link = f"http://{external_ip}:{dashboard_port}/status" dask.config.set({"distributed.dashboard.link": dashboard_link}) print(f"Dashboard will be available at: {dashboard_link}") # Scheduler settings job_config["scheduler_options"] = merge_dicts( { "port": scheduler_port, "dashboard_address": dash_port, "protocol": scheduler_protocol.replace("://", ""), "contact_address": contact_address, }, job_kwargs.get("scheduler_options", dask.config.get(f"jobqueue.{cls.config_name}.scheduler-options", {})), ) # Check for x509 proxy proxy = x509_user_proxy_path() use_proxy = proxy is not None # Job extra settings (HTCondor ClassAd) job_config["job_extra_directives"] = merge_dicts( { "universe": "docker", "docker_image": worker_image or dask.config.get(f"jobqueue.{cls.config_name}.worker-image"), "container_service_names": "dask,nanny", "dask_container_port": DEFAULT_CONTAINER_PORT, "nanny_container_port": DEFAULT_NANNY_PORT, "use_x509userproxy": use_proxy, "transfer_input_files": files, "encrypt_input_files": files, "transfer_output_files": "", "when_to_transfer_output": "ON_EXIT", "should_transfer_files": "YES", "+CoffeaCasaWorkerType": '"dask"', "+DaskSchedulerAddress": external_ip_string, "+AccountingGroup": '"cms.other.coffea.$ENV(HOSTNAME)"', }, job_kwargs.get("job_extra_directives", dask.config.get(f"jobqueue.{cls.config_name}.job_extra_directives", {})), ) return job_config
def security_obj(): """Return the Dask Security object used by CoffeaCasa""" ca_file = str(CA_FILE) cert_file = str(CERT_FILE) key_file = str(KEY_FILE if KEY_FILE.is_file() else CERT_FILE) return Security( tls_ca_file=ca_file, tls_worker_cert=cert_file, tls_worker_key=key_file, tls_client_cert=cert_file, tls_client_key=key_file, tls_scheduler_cert=cert_file, tls_scheduler_key=key_file, require_encryption=True, )