docker-samples/mailserver/s3-ingest.py

150 lines
5.1 KiB
Python
Raw Normal View History

2024-11-02 20:09:56 +00:00
#!/bin/env python3
import os
import datetime
import hashlib
import hmac
import http.client
import urllib.parse
import logging
import subprocess
import xml.etree.ElementTree as ET
# AWS S3 configuration
# Would rather these be in environment variables, but CRON doesn't have this.
bucket_name = "MYMAILBUCKET"
prefix = ""
region = 'us-west-2'
access_key = ""
secret_key = ""
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(key, date_stamp, region_name, service_name):
k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, 'aws4_request')
return k_signing
def create_signed_headers(method, host, uri, params, body=''):
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
canonical_uri = uri
canonical_querystring = '&'.join([f"{urllib.parse.quote_plus(k)}={urllib.parse.quote_plus(v)}" for k, v in params.items()])
payload_hash = hashlib.sha256(body.encode('utf-8')).hexdigest() if body else hashlib.sha256(b'').hexdigest()
# Include x-amz-date and x-amz-content-sha256 in canonical headers and signed headers
canonical_headers = f'host:{host}\n' \
f'x-amz-content-sha256:{payload_hash}\n' \
f'x-amz-date:{amz_date}\n'
signed_headers = 'host;x-amz-content-sha256;x-amz-date'
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f'{date_stamp}/{region}/s3/aws4_request'
string_to_sign = f'{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()}'
signing_key = get_signature_key(secret_key, date_stamp, region, 's3')
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = (
f"{algorithm} Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
headers = {
'x-amz-date': amz_date,
'x-amz-content-sha256': payload_hash,
'Authorization': authorization_header
}
return headers
def make_request(method, uri, params=None, headers=None):
host = f's3.{region}.amazonaws.com'
conn = http.client.HTTPSConnection(host)
if params:
query_string = urllib.parse.urlencode(params)
full_uri = f"{uri}?{query_string}"
else:
full_uri = uri
conn.request(method, full_uri, headers=headers)
response = conn.getresponse()
data = response.read()
conn.close()
return response.status, data
def list_objects():
uri = f'/{bucket_name}'
params = {'list-type': '2', 'prefix': prefix}
headers = create_signed_headers('GET', f's3.{region}.amazonaws.com', uri, params)
status, response = make_request('GET', uri, params, headers)
if status == 200:
return response
else:
logging.error(f"Error listing objects: {response}")
return None
def download_object(key):
uri = f'/{bucket_name}/{urllib.parse.quote_plus(key)}'
headers = create_signed_headers('GET', f's3.{region}.amazonaws.com', uri, {})
status, response = make_request('GET', uri, headers=headers)
if status == 200:
return response
else:
logging.error(f"Error downloading {key}: {response}")
return None
def delete_object(key):
uri = f'/{bucket_name}/{urllib.parse.quote_plus(key)}'
headers = create_signed_headers('DELETE', f's3.{region}.amazonaws.com', uri, {})
status, response = make_request('DELETE', uri, headers=headers)
if status == 204:
logging.info(f"Deleted {key} from S3")
else:
logging.error(f"Error deleting {key}: {response}")
def inject_email(email_content):
process = subprocess.Popen(['/usr/sbin/sendmail', '-t'], stdin=subprocess.PIPE)
process.communicate(input=email_content)
if process.returncode == 0:
logging.info("Email successfully injected into Postfix")
else:
logging.error("Failed to inject email into Postfix")
def main():
# List all objects with the specified prefix
xml_content = list_objects()
if xml_content:
root = ET.fromstring(xml_content)
namespace = {'ns': root.tag.split('}')[0].strip('{')} # Extracts namespace from the root tag
for contents in root.findall('.//ns:Contents', namespace):
key = contents.find('ns:Key', namespace).text
logging.info(f"Processing {key}")
email_content = download_object(key)
if email_content:
inject_email(email_content)
delete_object(key)
def extract_keys_from_xml(xml_content):
return [elem.text for elem in root.iter('Key')]
if __name__ == '__main__':
main()