create_ai_result(user, data, cfg=None)

create_ai_result

provides an api for remote llm(s) to stream results over https without being co-located with the llm(s)

e.g. run synthetic dataset generative jobs on the cloud (aws, gcp, azure, runpod.io and stream the results to your self-hosted rest api)

Parameters:
  • user (CoreUser) –

    authenticated user that is making this request

  • data (dict) –

    dictionary from data=CoreResultAI.get_dict()

  • cfg (dict, default: None ) –

    optional CoreConfig dictionary

Returns:
  • CoreResultAI | None

    CoreResultAI on success None on non-success

client_aic/req/ai/create_ai_result.py
def create_ai_result(
    user: core_user.CoreUser,
    data: dict,
    cfg: dict = None,
):
    """
    create_ai_result

    provides an api for remote llm(s)
    to stream results over https
    without being co-located with the
    llm(s)

    e.g. run synthetic dataset generative
    jobs on the cloud (aws, gcp, azure, runpod.io
    and stream the results to your self-hosted rest api)

    :param CoreUser user: authenticated user
        that is making this request
    :param data: dictionary from
        ``data=CoreResultAI.get_dict()``
    :param cfg: optional **CoreConfig**
        dictionary

    :returns: **CoreResultAI** on success
        **None** on non-success
    :rtype: CoreResultAI or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/ai/result'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    verify = tls_utils.get_verify(cfg)
    debug = cfg.get("debug", False)
    log.debug(f"create ai result: {url}")
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})
    data.pop("id", None)
    r = s.post(
        url,
        json=data,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=5,
    )
    if r.status_code != 201:
        log.error(
            "\n\n"
            "non-201 response:\n"
            f"  url: {url}\n"
            f"  job_result.id: {id}\n"
            f"  ca={verify}\n"
            f"  response:\n"
            f"  code: {r.status_code}\n"
            f"  text:\n"
            f"  {r.text}\n"
        )
        return None
    else:
        if debug:
            log.debug(
                f"create ai result success - {r.text}"
            )
        try:
            rec_dict = json.loads(r.text)
            cur_o = core_result_ai.CoreResultAI()
            cur_o.load_response_dict(rec_dict=rec_dict)
            return cur_o
        except Exception as e:
            log.error(
                f'failed to create ai result with ex="{e}"'
            )
    return None

get_ai_result(id, user, cfg=None)

get_ai_result

after the llm finishes processing the question, you can get the ai results table using this method

get the ai result by the CoreJob.id value using the rest api

Parameters:
  • id (int) –

    look up this CoreJob.id's ai results

  • user (CoreUser) –

    authenticated user that is making this request

  • cfg (dict, default: None ) –

    optional CoreConfig dictionary

Returns:
  • CoreResultAI | None

    CoreResultAI on success None on non-success

client_aic/req/ai/get_ai_result.py
def get_ai_result(
    id: int,
    user: core_user.CoreUser,
    cfg: dict = None,
):
    """
    get_ai_result

    after the llm finishes processing the question,
    you can get the ai results table using this
    method

    get the ai result by the **CoreJob.id** value
    using the rest api

    :param id: look up this **CoreJob.id**'s
        ai results
    :param CoreUser user: authenticated user
        that is making this request
    :param cfg: optional **CoreConfig** dictionary

    :returns: **CoreResultAI** on success
        **None** on non-success
    :rtype: CoreResultAI or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/ai/result/{id}'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    verify = tls_utils.get_verify(cfg)
    debug = cfg.get("debug", False)
    log.debug(f"get ai result: {url}")
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})
    data = {"user_id": user.id, "job_id": id}
    r = s.get(
        url,
        json=data,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=5,
    )
    if r.status_code != 200:
        log.error(
            "\n\n"
            "non-200 response:\n"
            f"  url: {url}\n"
            f"  job_result.id: {id}\n"
            f"  ca={verify}\n"
            f"  response:\n"
            f"  code: {r.status_code}\n"
            f"  text:\n"
            f"  {r.text}\n"
        )
        return None
    else:
        if debug:
            log.info(f"get ai result success - {r.text}")
        try:
            cur_json = json.loads(r.text)
            cur_o = core_result_ai.CoreResultAI(
                id=cur_json.get("id", None),
                user_id=cur_json.get("user_id", None),
                job_id=cur_json.get("job_id", None),
                worker_id=cur_json.get("worker_id", None),
                state=cur_json.get("state", None),
                question=cur_json.get("question", None),
                answer=cur_json.get("answer", None),
                model_name=cur_json.get("model_name", None),
                score=cur_json.get("score", None),
                question_score=cur_json.get(
                    "question_score", None
                ),
                answer_score=cur_json.get(
                    "answer_score", None
                ),
                match_source=cur_json.get(
                    "match_source", None
                ),
                match_page=cur_json.get("match_page", None),
                match_content=cur_json.get(
                    "match_content", None
                ),
                summarized_question=cur_json.get(
                    "summarized_question", None
                ),
                summarized_answer=cur_json.get(
                    "summarized_answer", None
                ),
                summarized_score=cur_json.get(
                    "summarized_score", None
                ),
                reviewed_answer=cur_json.get(
                    "reviewed_answer", None
                ),
                reviewed_score=cur_json.get(
                    "reviewed_score", None
                ),
                reviewed_computed_score=cur_json.get(
                    "reviewed_computed_score",
                    None,
                ),
                reviewed_notes=cur_json.get(
                    "reviewed_notes", None
                ),
                collection=cur_json.get("collection", None),
                collection_notes=cur_json.get(
                    "collection_notes", None
                ),
                category=cur_json.get("category", None),
                tags=cur_json.get("tags", None),
                latency=cur_json.get("latency", None),
                data=cur_json.get("data", None),
                created_at=cur_json.get("created_at", None),
                updated_at=cur_json.get("updated_at", None),
            )
            return cur_o
        except Exception as e:
            log.error(
                f'failed to get ai_result.id={id} with ex="{e}"'
            )
    return None

run_job_ask(question, user, cfg=None, session_id=None, derived_session_id=None, model_name=None, collection_id=None, collection_name=None, embed_model_name=None, tags=None, max_tokens=512, n_ctx=2048, n_batch=10, match_docs=3, max_doc_scores=3, min_q_score=0.3, min_a_score=0.3)

run_job_ask

ask the llm a question and get a CoreJob for tracking the progress

Run a generative ai job

Supports

  • question-answering (qa)
  • synthetic dataset generation
  • prompt template
  • user session tracking and historical context
  • reinforcement learning with human feedback
  • custom embedding models to use (hugging face supported)
  • custom rag embedding collection name or id to search
  • llm model parameter customization
Parameters:
  • question (str) –

    question to ask the llm

  • user (CoreUser) –

    authenticated user that is making this request

  • cfg (dict, default: None ) –

    optional CoreConfig dictionary

  • session_id (str, default: None ) –

    user session id from tracking historical context

  • derived_session_id (str, default: None ) –

    optional - sub tracking id for more granular historical context

  • model_name (str, default: None ) –

    llm model name

  • collection_id (str, default: None ) –

    search this rag collection id before asking the llm

  • collection_name (str, default: None ) –

    search this rag collection name before asking the llm

  • embed_model_name (str, default: None ) –

    name of the embedding model for this job

  • tags (str, default: None ) –

    comma-delimited tags for this question

  • max_tokens (int, default: 512 ) –

    supported max tokens on this request

  • n_ctx (int, default: 2048 ) –

    supported max context on this request (usually the llm is hardcoded to a specific context and you do not usually change this)

  • n_batch (int, default: 10 ) –

    number of batches for this question

  • match_docs (int, default: 3 ) –

    how many matching documents from the rag data source collection id/names are allowed before proceeding to the llm

  • max_doc_scores (int, default: 3 ) –

    how many document score are required for this request

  • min_q_score (float, default: 0.3 ) –

    minimum rag data source confidence score as a valid source for the llm to use with this question

  • min_a_score (float, default: 0.3 ) –

    minimum rag data source confidence score as a valid source for the llm to use with this answer

Returns:
  • CoreJob | None

    CoreJob on success None on non-success

client_aic/req/ai/run_job_ask.py
def run_job_ask(
    question: str,
    user: core_user.CoreUser,
    cfg: dict = None,
    session_id: str = None,
    derived_session_id: str = None,
    model_name: str = None,
    collection_id: str = None,
    collection_name: str = None,
    embed_model_name: str = None,
    tags: str = None,
    max_tokens: int = 512,
    n_ctx: int = 2048,
    n_batch: int = 10,
    match_docs: int = 3,
    max_doc_scores: int = 3,
    min_q_score: float = 0.3,
    min_a_score: float = 0.3,
):
    """
    run_job_ask

    ask the llm a question and get a **CoreJob**
    for tracking the progress

    Run a generative ai job

    Supports

    - question-answering (qa)
    - synthetic dataset generation
    - prompt template
    - user session tracking and historical context
    - reinforcement learning with human feedback
    - custom embedding models to use (hugging face supported)
    - custom rag embedding collection name or id to search
    - llm model parameter customization

    :param question: question to ask the llm
    :param user: authenticated user
        that is making this request
    :param cfg: optional **CoreConfig** dictionary
    :param session_id: user session id from
        tracking historical context
    :param derived_session_id: optional - sub
        tracking id for more granular historical context
    :param model_name: llm model name
    :param collection_id: search this
        rag collection id before asking the llm
    :param collection_name: search this
        rag collection name before asking the llm
    :param embed_model_name: name of the embedding
        model for this job
    :param tags: comma-delimited tags for this
        question
    :param max_tokens: supported max tokens
        on this request
    :param n_ctx: supported max context
        on this request (usually the llm is hardcoded
        to a specific context and you do not usually
        change this)
    :param n_batch: number of batches for
        this question
    :param match_docs: how many matching documents
        from the rag data source collection id/names
        are allowed before proceeding to the llm
    :param max_doc_scores: how many document score
        are required for this request
    :param min_q_score: minimum rag data source
        confidence score as a valid source for the llm
        to use with this question
    :param min_a_score: minimum rag data source
        confidence score as a valid source for the llm
        to use with this answer

    :returns: **CoreJob** on success
        **None** on non-success
    :rtype: CoreJob or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/job'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    verify = tls_utils.get_verify(cfg)
    log.debug(f'run job ask: {url} question="{question}"')
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})

    use_model_name = "mistral-7b-instruct-v0.1.Q4_K_M.gguf"
    use_embed_name = (
        "sentence-transformers/all-MiniLM-L6-v2"
    )
    use_tags = None
    use_session_id = None
    use_derived_session_id = None
    if model_name:
        use_model_name = model_name
    if embed_model_name:
        use_embed_name = embed_model_name
    if tags:
        use_tags = tags
    if session_id:
        use_session_id = session_id
    else:
        session_id = str(uuid.uuid4()).replace("-", "")
    if derived_session_id:
        use_derived_session_id = derived_session_id

    # src/requests/job/create_job.rs
    use_req = {
        "user_id": user.id,
        # ask worker id = 1
        # gen worker id = 2
        "worker_id": 1,
        # ready for work when state == 1
        "state": 1,
        # ask questions = 1
        # gen answers = 2
        "job_type": 1,
        "ask": {
            "msg": question,
            "model_name": use_model_name,
            "embed_model_name": use_embed_name,
            "collection_id": collection_id,
            "collection_name": collection_name,
            "max_tokens": max_tokens,
            "n_ctx": n_ctx,
            "n_batch": n_batch,
            "rag": {
                "dirs": [],
                "s3": [],
                "caches": [],
                "rss": [],
                "match_docs": match_docs,
                "max_doc_scores": max_doc_scores,
                "min_q_score": min_q_score,
                "min_a_score": min_a_score,
            },
            "data": {},
            "tags": use_tags,
            "session_id": use_session_id,
            "derived_session_id": use_derived_session_id,
        },
    }
    log.debug(
        "starting ai job with config:"
        f"\n{ppj.ppj(use_req)}\n"
    )
    use_json = json.dumps(use_req)
    r = s.post(
        url,
        use_json,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=5,
    )
    if r.status_code != 201:
        log.error(
            "\n\n"
            "non-201 response:\n"
            f"  url: {url}\n"
            f"  job_result.id: {id}\n"
            f"  ca={verify}\n"
            f"  response:\n"
            f"  code: {r.status_code}\n"
            f"  text:\n"
            f"  {r.text}\n"
        )
        return None
    else:
        try:
            cur_json = json.loads(r.text)
            cur_o = core_job.CoreJob(
                id=cur_json.get("id", None),
                user_id=cur_json.get("user_id", None),
                worker_id=cur_json.get("worker_id", None),
                job_type=cur_json.get("job_type", None),
                state=cur_json.get("state", None),
                status=cur_json.get("status", None),
                data=cur_json.get("data", None),
                msg=cur_json.get("msg", None),
                created_at=cur_json.get("created_at", None),
                updated_at=cur_json.get("updated_at", None),
            )
            return cur_o
        except Exception as e:
            log.error(
                f'failed to run ask job with ex="{e}"'
            )
    return None

search_ai_results(user, data, cfg=None)

search_ai_results

Search for ai results within the database

search for specific ai results using the supported parameters

Parameters:
  • user (CoreUser) –

    authenticated user that is making this request

  • data (dict) –

    request values dictionary

  • cfg (dict, default: None ) –

    optional CoreConfig dictionary

Returns:
  • CoreSearchResultAI | None

    CoreSearchResultAI on success None on non-success

client_aic/req/ai/search_ai_results.py
def search_ai_results(
    user: core_user.CoreUser,
    data: dict,
    cfg: dict = None,
):
    """
    search_ai_results

    Search for ai results within the database

    search for specific ai results using the
    supported parameters

    :param CoreUser user: authenticated user
        that is making this request
    :param data: request values dictionary
    :param cfg: optional **CoreConfig** dictionary

    :returns: **CoreSearchResultAI** on success
        **None** on non-success
    :rtype: CoreSearchResultAI or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/ai/result/search'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    debug = cfg.get("debug", False)
    verify = tls_utils.get_verify(cfg)
    log.debug(f"search ai result: {url}")
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})
    r = s.post(
        url,
        json=data,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=5,
    )
    if r.status_code != 200:
        log.error(
            "\n\n"
            "non-200 response:\n"
            f"  url: {url}\n"
            f"  job_result.id: {id}\n"
            f"  ca={verify}\n"
            f"  response:\n"
            f"  code: {r.status_code}\n"
            f"  text:\n"
            f"  {r.text}\n"
        )
        return None
    else:
        if debug:
            log.info(f"search ai result success - {r.text}")
        try:
            rec_dict = json.loads(r.text)
            cur_o = (
                core_search_result_ai.CoreSearchResultAI()
            )
            cur_o.load_response_dict(rec_dict=rec_dict)
            return cur_o
        except Exception as e:
            log.error(
                f"failed to search ai_result.id={id} "
                f'with ex="{e}"'
            )
    return None

update_ai_result(user, ai_result, cfg=None)

update_ai_result

update an existing ai result

supports reinforcement learning using human feedback with the reviewed_answer and reviewed_score fields

Parameters:
  • user (CoreUser) –

    authenticated user that is making this request

  • ai_result (CoreResultAI) –

    in-memory object with values to send to the database

  • cfg

    optional CoreConfig dictionary

Returns:
  • CoreResultAI | None

    CoreResultAI on success None on non-success

client_aic/req/ai/update_ai_result.py
def update_ai_result(
    user: core_user.CoreUser,
    ai_result: core_result_ai.CoreResultAI,
    cfg=None,
):
    """
    update_ai_result

    update an existing ai result

    supports reinforcement learning
    using human feedback with
    the **reviewed_answer** and **reviewed_score** fields

    :param CoreUser user: authenticated user
        that is making this request
    :param ai_result: in-memory object with
        values to send to the database
    :param cfg: optional **CoreConfig** dictionary

    :returns: **CoreResultAI** on success
        **None** on non-success
    :rtype: CoreResultAI or None
    """
    if not cfg:
        cfg = get_cfg.get_cfg()
    url = f'https://{cfg["endpoint"]}/ai/result'
    (cert_file, key_file) = tls_utils.get_certs(cfg)
    verify = tls_utils.get_verify(cfg)
    debug = cfg.get("debug", False)
    log.debug(f"update ai result: {url}")
    s = requests.Session()
    s.headers.update({"Bearer": f"{user.token}"})
    data = ai_result.get_dict()
    r = s.put(
        url,
        json=data,
        verify=verify,
        cert=(cert_file, key_file),
        timeout=5,
    )
    if r.status_code != 200:
        log.error(
            "\n\n"
            "non-200 response:\n"
            f"  url: {url}\n"
            f"  job_result.id: {id}\n"
            f"  ca={verify}\n"
            f"  response:\n"
            f"  code: {r.status_code}\n"
            f"  text:\n"
            f"  {r.text}\n"
        )
        return None
    else:
        if debug:
            log.info(f"update ai result success - {r.text}")
        try:
            cur_json = json.loads(r.text)
            cur_o = core_result_ai.CoreResultAI()
            cur_o.load_response_dict(rec_dict=cur_json)
            return cur_o
        except Exception as e:
            log.error(
                f'failed to update ai_result with ex="{e}"'
            )
    return None