Source code for awscli_bastion.cache
from dateutil.tz import tzutc
from dateutil.parser import parse
from os.path import isfile
import click
import datetime
import humanize
import json
import os
import pathlib
import sys
[docs]class Cache:
""" Manage the bastion-sts credential cache (~/.aws/cli/cache/bastion-sts.json). """
def __init__(self):
self.aws_shared_cache_path = os.path.join(pathlib.Path.home(), ".aws/cli/cache")
self.bastion_sts_cache_path = os.path.join(self.aws_shared_cache_path, "bastion-sts.json")
[docs] def does_exist(self):
""" Return whether or not the bastion-sts credential cache exists.
:rtype: bool
:return: Whether or not the bastion-sts credential cache exists.
"""
return isfile(self.bastion_sts_cache_path)
[docs] def is_expired(self):
""" Return whether or not the bastion-sts credentials are expired.
:return: Whether or not the bastion-sts credentials are expired.
:rtype: bool
"""
expired = False
if self.does_exist():
now_dt = datetime.datetime.now(tzutc())
expiration_iso = self.read()["Expiration"]
expiration_dt = parse(expiration_iso)
if now_dt > expiration_dt:
expired = True
else:
expired = True
return expired
[docs] def get_expiration(self, human_readable=True):
""" Return how much time until the bastion-sts credentials expire.
:param human_readable: Whether or not to output as human readable.
:type human_readable: bool
:return: How much time until the bastion-sts credentials expire.
:rtype: str
"""
try:
expiration_iso = self.read()["Expiration"]
except KeyError:
click.echo("The {} profile did not have the 'aws_session_expiration' attribute.")
sys.exit(1)
expiration_dt = parse(expiration_iso)
now_dt = datetime.datetime.now(tzutc())
delta = now_dt - expiration_dt
return humanize.naturaltime(delta) if human_readable else delta
[docs] def write(self, creds):
""" Writes json formatted credentials to the bastion-sts cache file.
:param creds: bastion-sts short-lived credentials.
:type creds: bool
:type creds: dict
"""
if not os.path.isdir(self.aws_shared_cache_path):
os.mkdirs(self.aws_shared_cache_path)
with open(self.bastion_sts_cache_path, 'w+') as f:
creds["Version"] = 1
json.dump(creds, f, indent=4)
[docs] def read(self):
""" Reads json formatted credentials to the bastion-sts cache file. """
with open(self.bastion_sts_cache_path, 'r') as f:
return json.load(f)
[docs] def delete(self):
""" Deletes the cache files in the aws shared cache directory. """
cache_files = os.listdir(self.aws_shared_cache_path)
if cache_files:
for cache in os.listdir(self.aws_shared_cache_path):
os.remove(os.path.join(self.aws_shared_cache_path, cache))
click.echo("- Deleted the '{}' file.".format(cache))
else:
click.echo("- No cache files to delete.")