Get a Job Result

get_job_result

get the user's job results from the db by the CoreJob.id

helpful for tracking job progress status.

Parameters:
  • id (int) –

    CoreJob.id for an existing user job

  • user (CoreUser) –

    authenticated user that is making this request

  • cfg (dict, default: None ) –

    optional - CoreConfig dictionary

Returns:
  • CoreResultJob | None

    CoreResultJob on success None on non-success

client_aic/req/job/get_job_result.py
def get_job_result(
    id: int,
    user: core_user.CoreUser,
    cfg: dict = None,
):
    """
    get_job_result

    get the user's job results from the db
    by the **CoreJob.id**

    helpful for tracking job progress status.

    :param id: CoreJob.id for an existing user job
    :param CoreUser user: authenticated user
        that is making this request
    :param cfg: optional - **CoreConfig** dictionary

    :returns: **CoreResultJob** on success
        **None** on non-success
    :rtype: CoreResultJob or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/job/result/{id}'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    verify = tls_utils.get_verify(cfg)
    debug = cfg.get("debug", False)
    log.debug(f"get job result: {url}")
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})
    data = {"user_id": user.id, "job_id": id}
    r = s.get(
        url,
        json=data,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=10,
    )
    if r.status_code != 200:
        if debug:
            log.info(
                "\n\n"
                "non-200 response:\n"
                f"  url: {url}\n"
                f"  job_result.id: {id}\n"
                f"  ca={verify}\n"
                f"  response:\n"
                f"  code: {r.status_code}\n"
                f"  text:\n"
                f"  {r.text}\n"
            )
        return None
    else:
        try:
            cur_json = json.loads(r.text)
            cur_o = core_result_job.CoreResultJob(
                id=cur_json.get("id", None),
                job_id=cur_json.get("job_id", None),
                worker_id=cur_json.get("worker_id", None),
                job_type=cur_json.get("job_type", None),
                user_id=cur_json.get("user_id", None),
                state=cur_json.get("state", None),
                data=cur_json.get("data", None),
                msg=cur_json.get("msg", None),
                created_at=cur_json.get("created_at", None),
                updated_at=cur_json.get("updated_at", None),
            )
            return cur_o
        except Exception as e:
            log.error(
                f'failed to get job_result.id={id} with ex="{e}"'
            )
    return None