Skip to content

Cloud Asset Inventory: Querying Resource State & Compliance

Tại sao cần Cloud Asset Inventory

Khi hierarchy scales, impossibly difficult để maintain accurate view của tất cả resources:

Organization with 1000 projects, 100,000 resources:
- Which VMs have external IPs? (Compliance violation if unexpected)
- Who has Owner role? (Security audit)
- Which resources in soft-delete state? (Quota tracking)
- Which Compute Engine resources không comply with security policy?
- What's the current state of resource X? (Troubleshooting)

Manual approaches fail:

  • Querying each service individually (slow, incomplete)
  • Running gcloud commands (not scalable)
  • Parsing audit logs (reactive, not proactive)

Cloud Asset Inventory provides:

  • Unified view của tất cả resources (200+ resource types)
  • Queryable API with filtering/expression support
  • Real-time resource state (live queries) + historical (via exports)
  • Compliance tooling (Policy Analyzer, Security Command Center integration)

Architecture

Cloud Asset Inventory maintains two data stores:

1. Real-time Asset Metadata

Organization → Folder → Project → Resources

Cloud Asset Inventory (indexed database)

Query API

Characteristics:

  • Live resource state
  • Queryable within seconds
  • Regional storage (depends on asset location)
  • Full hierarchical context

2. Historical Exports

Real-time data → BigQuery / Cloud Storage exports

Historical analysis
Trend detection
Compliance auditing

Characteristics:

  • Full historical record
  • Queryable via BigQuery SQL
  • Custom retention policies
  • Cost: Storage + BigQuery query

Query Patterns

Pattern 1: Find all resources of a type

bash
# Find all Compute Engine instances
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --asset-types=compute.googleapis.com/Instance

# Find all Cloud Storage buckets
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --asset-types=storage.googleapis.com/Bucket

# Find multiple types
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --asset-types=compute.googleapis.com/Instance,storage.googleapis.com/Bucket

Pattern 2: Filter by resource attributes

bash
# Find VMs with external IPs (potential security issue)
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --asset-types=compute.googleapis.com/Instance \
  --query="networkInterfaces.accessConfigs.natIP:*"

# Find resources in specific project
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --query="project:my-project-prod"

# Find resources by location
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --query="location:us-central1"

Pattern 3: Compliance auditing

python
from google.cloud import asset_v1

def audit_external_ips(org_id):
    """Find all VMs with unexpected external IPs"""
    
    client = asset_v1.AssetServiceClient()
    
    # Query VMs with external IPs
    query = "networkInterfaces.accessConfigs.natIP:*"
    
    request = asset_v1.SearchAllResourcesRequest(
        scope=f"organizations/{org_id}",
        asset_types=["compute.googleapis.com/Instance"],
        query=query
    )
    
    findings = []
    for result in client.search_all_resources(request=request):
        # Check if external IP is expected
        if not is_external_ip_allowed(result):
            findings.append({
                "resource": result.name,
                "project": result.project,
                "issue": "Unexpected external IP"
            })
    
    return findings

def audit_owner_role(org_id):
    """Find all users/service accounts with Owner role"""
    
    client = asset_v1.AssetServiceClient()
    
    # Query IAM bindings
    request = asset_v1.SearchAllIamPoliciesRequest(
        scope=f"organizations/{org_id}",
        query='roles/owner'
    )
    
    findings = []
    for result in client.search_all_iam_policies(request=request):
        for binding in result.bindings:
            if binding.role == "roles/owner":
                for member in binding.members:
                    findings.append({
                        "resource": result.resource,
                        "member": member,
                        "role": binding.role
                    })
    
    return findings

Drift Detection

Cloud Asset Inventory enables automated compliance checking:

python
def detect_resource_drift(resource_id, expected_config):
    """Detect if resource drifted from expected state"""
    
    from google.cloud import asset_v1
    
    client = asset_v1.AssetServiceClient()
    
    # Get current asset state
    response = client.list_assets(
        parent=f"projects/PROJECT_ID",
        asset_types=[resource_type]
    )
    
    actual_config = None
    for asset in response:
        if asset.name == resource_id:
            actual_config = asset.resource
            break
    
    if not actual_config:
        return {"status": "MISSING", "drift": expected_config}
    
    # Compare fields
    drifts = {}
    for key, expected_value in expected_config.items():
        actual_value = actual_config.get(key)
        if actual_value != expected_value:
            drifts[key] = {
                "expected": expected_value,
                "actual": actual_value
            }
    
    if drifts:
        return {"status": "DRIFTED", "drifts": drifts}
    else:
        return {"status": "COMPLIANT"}

IAM Policy Auditing

bash
# Find all service accounts with specific role
gcloud asset search-all-iam-policies \
  --scope=organizations/ORG_ID \
  --query='roles/serviceaccountUser'

# Find specific member's permissions
gcloud asset search-all-iam-policies \
  --scope=organizations/ORG_ID \
  --query='members:user@example.com'

# Export all IAM policies to BigQuery
gcloud asset export --project=EXPORT_PROJECT \
  --bucket=gs://asset-export-bucket \
  --asset-types=cloudresourcemanager.googleapis.com/Project

Historical Analysis via BigQuery

sql
-- Query historical asset data
-- (after exporting Cloud Asset Inventory to BigQuery)

SELECT
  TIMESTAMP(DATE(timestamp)) as date,
  asset_type,
  COUNT(*) as resource_count
FROM `project.dataset.cloudresourcemanager_assets_YYYYMM`
WHERE asset_type = 'compute.googleapis.com/Instance'
GROUP BY date, asset_type
ORDER BY date DESC;

-- Detect newly created resources
SELECT
  name,
  display_name,
  create_time
FROM `project.dataset.compute_instances_YYYYMM`
WHERE DATE(create_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
ORDER BY create_time DESC;

-- Find resources without required labels
SELECT
  name,
  display_name,
  STRUCT(
    JSON_EXTRACT_SCALAR(resource.data, '$.labels.environment') as environment,
    JSON_EXTRACT_SCALAR(resource.data, '$.labels.team') as team
  ) as labels
FROM `project.dataset.compute_instances_YYYYMM`
WHERE DATE(timestamp) = CURRENT_DATE()
  AND (
    JSON_EXTRACT_SCALAR(resource.data, '$.labels.environment') IS NULL
    OR JSON_EXTRACT_SCALAR(resource.data, '$.labels.team') IS NULL
  );

Security Command Center Integration

python
from google.cloud import securitycenter_v1

def create_compliance_finding(org_id, resource_name, issue):
    """Create Security Command Center finding from compliance check"""
    
    client = securitycenter_v1.SecurityCenterClient()
    
    parent = f"organizations/{org_id}"
    source_name = f"{parent}/sources/FINDING_SOURCE_ID"
    
    finding = securitycenter_v1.Finding(
        state=securitycenter_v1.Finding.State.ACTIVE,
        resource_name=resource_name,
        finding_class=securitycenter_v1.Finding.FindingClass.MISCONFIGURATION,
        severity=securitycenter_v1.Finding.Severity.HIGH,
        source_properties={
            "issue": issue,
            "recommendation": "Fix configuration"
        }
    )
    
    client.create_finding(
        parent=source_name,
        finding=finding
    )

Scalability Considerations

Large-scale queries

bash
# For large orgs, queries may take time
# Use filters to narrow scope

# ❌ Too broad
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID

# ✅ Narrower
gcloud asset search-all-resources \
  --scope=organizations/ORG_ID \
  --asset-types=compute.googleapis.com/Instance \
  --query="location:us-central1"

Export vs Query Trade-offs

Use Real-time Query API when:
- Need current state
- One-off queries
- < 100k resources

Use BigQuery Exports when:
- Historical analysis
- Recurring queries (cost-effective)
- Large-scale compliance reports
- Trend analysis

Cost Considerations

Cloud Asset Inventory:
- Real-time queries: Free (included)
- Exports: $1 per 1M resources per month
- BigQuery: Standard query pricing ($6.25 per TB)

Example org with 100k resources:
- Exports: $0.10/month
- Queries: 10 queries × $0.01 per query (est) = $0.10/month
- Total: ~$0.20/month (very cheap)

Monitoring via Policy Analyzer

Policy Analyzer detect IAM policy misconfigurations:

python
def analyze_iam_policy(org_id):
    """Check for overly permissive IAM policies"""
    
    from google.cloud import asset_v1
    
    client = asset_v1.AssetServiceClient()
    
    # Analyze IAM policy
    analysis = client.analyze_iam_policy(
        analysis_query=asset_v1.IamPolicyAnalysisQuery(
            scope=f"organizations/{org_id}",
            resource_selector=asset_v1.IamPolicyAnalysisQuery.ResourceSelector(
                # Query all resources
            ),
            access_selector=asset_v1.IamPolicyAnalysisQuery.AccessSelector(
                permissions=["*"]  # All permissions
            ),
            identity_selector=asset_v1.IamPolicyAnalysisQuery.IdentitySelector(
                identity="principalSet://goog/public:all"  # Public access
            )
        )
    )
    
    # Review findings for risky public access
    for result in analysis.analysis_results:
        if result.fully_explored:
            # Resource allows public access
            print(f"⚠️  {result.resource} allows public access")

Automation Patterns

python
# Continuous compliance checking

def schedule_compliance_audit():
    """Run compliance checks on schedule"""
    
    from google.cloud import scheduler_v1
    
    # Create Cloud Scheduler job
    scheduler_client = scheduler_v1.CloudSchedulerClient()
    
    parent = f"projects/PROJECT_ID/locations/us-central1"
    
    job = scheduler_v1.Job(
        name=f"{parent}/jobs/compliance-audit",
        description="Daily compliance audit",
        schedule="0 2 * * *",  # 2 AM daily
        time_zone="UTC",
        http_target=scheduler_v1.HttpTarget(
            uri="https://us-central1-PROJECT_ID.cloudfunctions.net/audit",
            http_method=scheduler_v1.HttpMethod.POST
        )
    )
    
    scheduler_client.create_job(parent=parent, job=job)

# Cloud Function for compliance audit
def compliance_audit(request):
    """Cloud Function triggered by scheduler"""
    
    findings = []
    findings.extend(audit_external_ips(ORG_ID))
    findings.extend(audit_owner_role(ORG_ID))
    
    if findings:
        notify_security_team(findings)
    
    return {"status": "completed", "findings": len(findings)}

References