Authenticate and New User Creation
authenticate
helper for authenticating as a new or existing user based off either the username/password/email or CoreConfig cfg dictionary. requires using credentials from one or the other.
| Parameters: |
|
|---|
| Returns: |
|
|---|
client_aic/authenticate.py
def authenticate(
username: str = None,
email: str = None,
password: str = None,
auto_create: bool = True,
cfg: dict = None,
):
"""
authenticate
helper for authenticating as a new or
existing user based off either the
username/password/email or **CoreConfig**
cfg dictionary. requires using
credentials from one or the other.
:param username: optional username
:param email: optional email
:param password: optional password
:param auto_create: optional flag for
creating a user if they do not exist
already and the default is **True**
:param cfg: optional **CoreConfig** dictionary
:returns: **CoreUser** if success
**None** if non-success
:rtype: CoreUser
"""
if not cfg:
cfg = get_cfg.get_cfg()
cfg_user = cfg.get("user", {})
use_username = username
use_password = password
use_email = email
if len(cfg_user) > 0:
if not username:
use_username = cfg_user.get("u", username)
if not password:
use_password = cfg_user.get("p", password)
if not email:
use_email = cfg_user.get("e", email)
user = None
try:
user = login.login(
email=email,
password=password,
cfg=cfg,
# force = support for saving a new token
# locally again in case the old one expired
force=False,
)
if not user:
if not use_username:
use_uuid = str(uuid.uuid4()).replace(
"-", ""
)
use_username = f"rt.2023.{use_uuid}"
if auto_create:
log.debug(
f"creating user email={use_email}"
)
create_user.create_user(
username=use_username,
password=use_password,
email=use_email,
cfg=cfg,
)
log.debug(
"trying to login with force "
f"user with email={use_email}"
)
user = login.login(
email=use_email,
password=use_password,
cfg=cfg,
# force = support for saving a new token
# locally again in case the old one expired
force=True,
)
log.debug("validating access with token")
found_user = get_user.get_user(
id=user.id,
user=user,
cfg=cfg,
)
if not found_user:
log.error(
f"failed to get user.id={user.id}"
)
return None
# end of login
log.debug("done")
except Exception as e:
log.error(
f'failed to auth user={email} with e="{e}"'
)
return user
login
login the user by the email and password on success save the credentials to the creds_file_path (environment variable AI_CREDS_FILE supported too)
log a user into the ai core rest api and save the CoreUser temporary credentials locally to the creds_file path
Optional Settings with Env Vars
Disable saving local credentials json file
Disable saving your credentials as a local json file for faster requests. By disabling this feature, it will force the client to re-login to get a valid token (which is a more secure mode for production workloads):
export DISABLE_CRED_CACHE=1
Change the path to the credentials file
export AI_CREDS_FILE=path_to_creds.json
| Parameters: |
|
|---|
| Returns: |
|
|---|
client_aic/req/auth/login.py
def login(
email: str,
password: str,
cfg: dict = None,
creds_file_path: str = None,
force: bool = False,
):
"""
login
login the user by the **email** and **password**
on success save the credentials to the
**creds_file_path** (environment
variable **AI_CREDS_FILE** supported too)
log a user into the ai core rest api
and save the **CoreUser**
temporary credentials locally to the
**creds_file** path
**Optional Settings with Env Vars**
**Disable saving local credentials json file**
Disable saving your credentials as a local json
file for faster requests. By disabling this
feature, it will force
the client to re-login to get a valid token (which
is a more secure mode for production workloads):
```bash
export DISABLE_CRED_CACHE=1
```
**Change the path to the credentials file**
```bash
export AI_CREDS_FILE=path_to_creds.json
```
:param email: user's email address
:param password: user's password
:param cfg: optional **CoreConfig** dictionary
:param creds_file_path: path to a file
where the user's authenticated credentials
are saved locally for speeding up
subsequent api requests
:param force: flag for overwritting the
**creds_file_path** like for
when the **CoreUser**'s token expires
:returns: **CoreUser** on success
**None** on non-success
:rtype: CoreUser or None
"""
# if no email/password
# it's in the creds.json
# or its not a valid api request
# (env vars are loaded upstream
# CoreConfig **cfg** dictionary)
if not email and not password:
home_dir = os.getenv("HOME", None)
creds_dir = f"{home_dir}/.redten"
def_creds_path = f"{creds_dir}/creds.json"
creds_path = os.getenv(
"AI_CREDS_FILE",
cfg.get("ai_creds_file", def_creds_path),
)
if creds_file_path:
creds_path = creds_file_path
if os.path.exists(creds_path):
user_json = {}
with open(creds_path, "r") as fp:
user_json = json.loads(fp.read())
user = core_user.CoreUser(
id=user_json.get("id", -2),
email=user_json.get("email", "not found"),
state=user_json.get("state", -2),
verified=user_json.get("verified", -2),
role=user_json.get("role", "not found"),
token=user_json.get("token", "not found"),
msg=user_json.get("msg", "not found"),
)
log.debug(f"using existing creds: {creds_path}")
return user
# by here the cfg was already parsed by the authenticate
# api call so the email/password are required
if not email and not password:
log.error(
"invalid login - missing email and password"
)
return None
if not cfg:
cfg = get_cfg.get_cfg()
if not email:
log.error("invalid login - " "missing email")
return None
if not password:
log.error("invalid login - " "missing password")
return None
url = f'https://{cfg["endpoint"]}/login'
(cert_file, key_file) = tls_utils.get_certs(cfg)
verify = tls_utils.get_verify(cfg)
debug = cfg.get("debug", False)
data = {
"email": email,
"password": password,
}
log.debug(f"login: {url} data={data} ca={verify}")
s = requests.Session()
r = s.post(
url,
json.dumps(data),
verify=verify,
cert=(cert_file, key_file),
timeout=10,
)
if r.status_code != 201:
if debug:
log.error(
"\n\n"
"failed login:\n"
f"url: {url}\n"
f"data: {data}\n"
f"ca: {verify}\n"
f"response:\ncode: {r.status_code}\n"
f"text:\n{r.text}\n"
)
if "invalid password" in r.text:
log.error(f"invalid password for {email}")
else:
test_str = (
"user does not exist " f"with email={email}"
)
if test_str in r.text:
# likely not an error
log.debug(f"no user email={email}")
else:
log.error(
f"no user email={email} "
f"response={r.text}"
)
return None
else:
log.debug(f"login success - {r.text}")
try:
user_json = json.loads(r.text)
user = core_user.CoreUser(
id=user_json.get("user_id", -2),
email=user_json.get("email", "not found"),
state=user_json.get("state", -2),
verified=user_json.get("verified", -2),
role=user_json.get("role", "not found"),
token=user_json.get("token", "not found"),
msg=user_json.get("msg", "not found"),
)
# disable saving creds
# if the environment variable
# export DISABLE_CRED_CACHE=1
if os.getenv("DISABLE_CRED_CACHE", "0") == "0":
user.save_creds()
return user
except Exception as e:
log.error(f'failed to login with ex="{e}"')
return None