Python Examples
Provides examples that show you how to connect to an S3 storage service and perform bucket and object operations in Python.
NOTICE The S3 gateway is included in EEP 6.0.0 -
EEP 8.0.0 repositories. S3 gateway is not
supported in HPE Ezmeral Data Fabric 7.0.0 onward. HPE Ezmeral Data Fabric 7.0.0 introduces a native object storage solution.
For more information, see HPE Ezmeral Data Fabric Object Store.
This example has the following dependencies:
- boto3 1.15.13
- botocore 1.16.26
Examples provided show the code for connecting and the following operations:
- list buckets
- create a bucket
- delete a bucket
- verify that a bucket exists
- list files
- upload a file
- delete a file
- verify that a file exists
Connection
import boto3
import urllib3
from botocore.client import Config
from operations import demo
if __name__ == '__main__':
host = "http://localhost:9000"
username = "minioadmin"
password = "minioadmin"
bucketName = "test"
file = "file"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
s3 = boto3.client('s3',
endpoint_url=host,
aws_access_key_id=username,
aws_secret_access_key=password,
config=Config(signature_version='s3v4'),
region_name='us-east-1',
verify=False)
demo(s3, bucketName, file)
Operations
from datetime import datetime
from botocore.exceptions import ClientError
def demo(s3, bucket_name, file):
print("\n")
list_buckets(s3)
print("\n")
create_bucket_if_not_exists(s3, bucket_name)
list_buckets(s3)
print("\n")
upload_file_if_not_exists(s3, bucket_name, file)
list_folder(s3, bucket_name)
print("\n")
read_file(s3, bucket_name, file)
print("\n")
delete_all(s3, bucket_name)
list_folder(s3, bucket_name)
print("\n")
delete_bucket_if_exists(s3, bucket_name)
list_buckets(s3)
def list_buckets(s3):
print("Buckets:")
for bucket in s3.list_buckets()['Buckets']:
print(bucket['Name'])
def create_bucket_if_not_exists(s3, bucket_name):
if check_if_bucket_exists(s3, bucket_name):
print("Bucket exists")
return
print("Bucket does not exist, creating...")
s3.create_bucket(Bucket=bucket_name)
def check_if_bucket_exists(s3, bucket_name):
print("Checking if bucket '" + bucket_name + "' exists")
try:
s3.head_bucket(Bucket=bucket_name)
except ClientError as e:
return int(e.response['Error']['Code']) != 404
return True
def delete_bucket_if_exists(s3, bucket_name):
print("Checking if bucket '" + bucket_name + "' exists")
try:
s3.head_bucket(Bucket=bucket_name)
print("Bucket exists, deleting...")
s3.delete_bucket(Bucket=bucket_name)
except ClientError:
print("Bucket does not exist")
def list_folder(s3, bucket_name):
print("Listing bucket " + bucket_name + ":")
objects = s3.list_objects(Bucket=bucket_name)
if 'Contents' in objects:
for key in objects['Contents']:
print(key['Key'])
else:
print("Is empty")
def delete_all(s3, bucket_name):
print("Cleaning all form bucket: " + bucket_name)
objects = s3.list_objects(Bucket=bucket_name)
if 'Contents' in objects:
for key in objects['Contents']:
s3.delete_object(Bucket=bucket_name, Key=key['Key'])
def upload_file_if_not_exists(s3, bucket_name, file_name):
if check_if_file_exist(s3, bucket_name, file_name):
print("File '" + file_name + "' already exists in bucket '" + bucket_name + "'")
return
date = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
data = "Hello world on " + date + "!"
print("Uploading '" + file_name + "' to '" + bucket_name + "' with data:\n" + data)
s3.put_object(Body=data.encode('ascii'), Bucket=bucket_name, Key=file_name)
def read_file(s3, bucket_name, file_name):
print("Reading file '" + file_name + "' from bucket '" + bucket_name + "'")
obj = s3.get_object(Bucket=bucket_name, Key=file_name)
print("Data:")
print(obj['Body'].read().decode('utf-8'))
def check_if_file_exist(s3, bucket_name, file):
print("Checking if file '" + file + "' exists in bucket '" + bucket_name + "'")
try:
s3.head_object(Bucket=bucket_name, Key=file)
except ClientError as e:
return int(e.response['Error']['Code']) != 404
return True