ask

use the reinforcement learning with human feedback and rag rest api to ask the underlying llm a question and wait for the results

Parameters:
  • question (str) –

    question to ask the llm

  • collection_id (str) –

    embedding alias name to use for the rag source data

  • email (str, default: None ) –

    optional - user email for the rest api

  • password (str, default: None ) –

    optional - user password for the rest api

  • username (str, default: None ) –

    optional - username for the rest api

  • job_params (dict, default: None ) –

    optional - llm question properties and attributes

  • cfg_core (dict, default: None ) –

    optional - CoreConfig dictionary

  • wait_for_result (bool, default: True ) –

    optional flag - with default set to True. When True this function will wait for the CoreResultAI before returning. When False this function will start the llm job and then return a None for the CoreResultAI in the returned tuple (CoreResultJob, None). please use the CoreJob.id to periodically check if the job is done.

  • wait_interval (float, default: 2.0 ) –

    float - optional - how many seconds to wait before trying to get the CoreResultAI record from the rest api

Returns:
  • (CoreUser, CoreResultAI, CoreResultAI)

    on success (CoreUser, CoreResultAI, CoreResultAI) versus non-success can return (None, None, None)

client_aic/ask.py
def ask(
    question: str,
    collection_id: str,
    email: str = None,
    password: str = None,
    username: str = None,
    job_params: dict = None,
    cfg_core: dict = None,
    wait_for_result: bool = True,
    wait_interval: float = 2.0,
):
    """
    ask

    use the reinforcement learning with human
    feedback and rag rest api to ask the
    underlying llm a question
    and wait for the results

    :param question: question to ask the llm
    :param collection_id: embedding alias name
        to use for the rag source data
    :param email: optional - user email for the rest api
    :param password: optional - user password for the rest api
    :param username: optional - username for the rest api
    :param job_params: optional - llm question
        properties and attributes
    :param cfg_core: optional - **CoreConfig** dictionary
    :param wait_for_result: optional flag -
        with default set to **True**.
        When **True** this function will wait for the
        **CoreResultAI** before returning.
        When **False** this function
        will start the llm job and then return a **None**
        for the **CoreResultAI** in the
        returned tuple (**CoreResultJob**, None). please
        use the CoreJob.id to periodically check if the
        job is done.
    :param wait_interval: float - optional - how
        many seconds to wait before trying to get the
        **CoreResultAI** record from the rest api

    :returns: on success (**CoreUser**, **CoreResultAI**,
        **CoreResultAI**) versus non-success can return
        (**None**, **None**, **None**)
    :rtype: (CoreUser, CoreResultAI, CoreResultAI)
    """
    res_job = None
    res_ai = None
    user = None
    debug = os.getenv("LLM_DEBUG", "0") == "1"
    cfg = cfg_core
    if not cfg_core:
        cfg = get_cfg.get_cfg()
    cfg_user = cfg.get("user", {})
    if not username:
        username = os.getenv(
            "AI_USERNAME", cfg_user.get("u", None)
        )
    if not password:
        password = os.getenv(
            "AI_PASSWORD", cfg_user.get("p", None)
        )
    if not email:
        email = os.getenv(
            "AI_EMAIL", cfg_user.get("e", None)
        )
    missing_env_vars = []
    if not question or len(question) < 4:
        log.error(
            "please ask a question more than 4 characters"
        )
        return (user, res_job, res_ai)
    if not email:
        missing_env_vars.append("AI_EMAIL")
    if not password:
        missing_env_vars.append("AI_PASSWORD")
    if len(missing_env_vars) > 0:
        missing_str = ", ".join(missing_env_vars)
        log.error(
            "please set these environment variables "
            f"and retry: {missing_str}"
        )
        return (user, res_job, res_ai)
    # name of a pgvector embedding db
    # database connection dict
    user = auth.authenticate(
        username=username,
        email=email,
        password=password,
        cfg=cfg,
    )
    if not user:
        log.error(f"failed to login as user: {username}")
        return (user, res_job, res_ai)
    log.info(
        f"user={username} asking='{question}' "
        f"embedding collection_id={collection_id} "
        "please be patient while the llm responds "
        "there could be a lot of users on the system "
        "at this time"
    )
    create_job_res = run_job_ask.run_job_ask(
        question=question,
        user=user,
        collection_id=collection_id,
        cfg=cfg,
    )
    if not create_job_res:
        log.error("failed to start job ")
        return (user, res_job, res_ai)
    job_id = int(create_job_res.id)
    if wait_for_result:
        log.debug(
            f"waiting for job_id={job_id} "
            f"to finish sleeping {wait_interval}s "
            "per retry"
        )
    else:
        log.debug(f"not waiting for job_id={job_id}")
        res_job = core_result_job.CoreResultJob(
            job_id=job_id,
            user_id=user.id,
            state=create_job_res.state,
        )
        return (user, res_job, res_ai)

    if job_id:
        if job_id < 1:
            log.error(
                "please use a positive integer "
                f"job_id={job_id} value"
            )
            return (user, res_job, res_ai)
        log.debug(
            "getting account.ai_result where "
            f"job.id = {job_id}"
        )
        not_done = True
        while not_done:
            res_job = get_job_result.get_job_result(
                id=job_id, user=user, cfg=cfg
            )
            if not res_job:
                if wait_for_result:
                    log.debug(
                        f"waiting job_id={job_id} "
                        f"{wait_interval}s"
                    )
                    time.sleep(wait_interval)
                    continue
                else:
                    log.error(
                        f"did not find the job_id={job_id}"
                    )
                    return (user, res_job, res_ai)
            # get by the new result's parent job id
            # that should be the same as job_id
            res_ai = get_ai_result.get_ai_result(
                id=res_job.job_id,
                user=user,
                cfg=cfg,
            )
            if not res_ai:
                log.error(
                    "failed getting ai result: "
                    f"job_id={res_job.id}"
                )
                return (user, res_job, res_ai)
            if debug:
                log.debug(
                    f"got ai result id={res_ai.id} "
                    f"answer={res_ai.answer} "
                    f"job_id={res_job.job_id}"
                    f"job_result_id={res_job.id}"
                    f"answer: {res_ai.answer}"
                )
            not_done = False
        # end of while
    else:
        log.error("no job_id detected on command line")

    if not res_ai:
        log.error(
            "failed getting ai result: " f"job_id={job_id}"
        )
        return (user, res_job, res_ai)
    if user and res_job and res_ai:
        if debug:
            log.debug(
                f"user={user.id} job_id={res_job.id} "
                f"has ai results:\n{ppj.ppj(res_ai.get_dict())}"
            )
        else:
            log.debug(
                f"answer for user={user.id} job_id={res_job.id} "
                f"ai_result_id={res_ai.id}"
            )
    else:
        log.error(
            f"user={user.id} ask hit unexpected return case "
            "\n"
            f"res_job={res_job}\n"
            f"res_ai={res_ai}\n"
        )
    return (user, res_job, res_ai)