Cloud Asset Inventory: Querying Resource State & Compliance
Tại sao cần Cloud Asset Inventory
Khi hierarchy scales, impossibly difficult để maintain accurate view của tất cả resources:
Organization with 1000 projects, 100,000 resources:
- Which VMs have external IPs? (Compliance violation if unexpected)
- Who has Owner role? (Security audit)
- Which resources in soft-delete state? (Quota tracking)
- Which Compute Engine resources không comply with security policy?
- What's the current state of resource X? (Troubleshooting)Manual approaches fail:
- Querying each service individually (slow, incomplete)
- Running gcloud commands (not scalable)
- Parsing audit logs (reactive, not proactive)
Cloud Asset Inventory provides:
- Unified view của tất cả resources (200+ resource types)
- Queryable API with filtering/expression support
- Real-time resource state (live queries) + historical (via exports)
- Compliance tooling (Policy Analyzer, Security Command Center integration)
Architecture
Cloud Asset Inventory maintains two data stores:
1. Real-time Asset Metadata
Organization → Folder → Project → Resources
↓
Cloud Asset Inventory (indexed database)
↓
Query APICharacteristics:
- Live resource state
- Queryable within seconds
- Regional storage (depends on asset location)
- Full hierarchical context
2. Historical Exports
Real-time data → BigQuery / Cloud Storage exports
↓
Historical analysis
Trend detection
Compliance auditingCharacteristics:
- Full historical record
- Queryable via BigQuery SQL
- Custom retention policies
- Cost: Storage + BigQuery query
Query Patterns
Pattern 1: Find all resources of a type
bash
# Find all Compute Engine instances
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--asset-types=compute.googleapis.com/Instance
# Find all Cloud Storage buckets
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--asset-types=storage.googleapis.com/Bucket
# Find multiple types
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--asset-types=compute.googleapis.com/Instance,storage.googleapis.com/BucketPattern 2: Filter by resource attributes
bash
# Find VMs with external IPs (potential security issue)
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--asset-types=compute.googleapis.com/Instance \
--query="networkInterfaces.accessConfigs.natIP:*"
# Find resources in specific project
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--query="project:my-project-prod"
# Find resources by location
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--query="location:us-central1"Pattern 3: Compliance auditing
python
from google.cloud import asset_v1
def audit_external_ips(org_id):
"""Find all VMs with unexpected external IPs"""
client = asset_v1.AssetServiceClient()
# Query VMs with external IPs
query = "networkInterfaces.accessConfigs.natIP:*"
request = asset_v1.SearchAllResourcesRequest(
scope=f"organizations/{org_id}",
asset_types=["compute.googleapis.com/Instance"],
query=query
)
findings = []
for result in client.search_all_resources(request=request):
# Check if external IP is expected
if not is_external_ip_allowed(result):
findings.append({
"resource": result.name,
"project": result.project,
"issue": "Unexpected external IP"
})
return findings
def audit_owner_role(org_id):
"""Find all users/service accounts with Owner role"""
client = asset_v1.AssetServiceClient()
# Query IAM bindings
request = asset_v1.SearchAllIamPoliciesRequest(
scope=f"organizations/{org_id}",
query='roles/owner'
)
findings = []
for result in client.search_all_iam_policies(request=request):
for binding in result.bindings:
if binding.role == "roles/owner":
for member in binding.members:
findings.append({
"resource": result.resource,
"member": member,
"role": binding.role
})
return findingsDrift Detection
Cloud Asset Inventory enables automated compliance checking:
python
def detect_resource_drift(resource_id, expected_config):
"""Detect if resource drifted from expected state"""
from google.cloud import asset_v1
client = asset_v1.AssetServiceClient()
# Get current asset state
response = client.list_assets(
parent=f"projects/PROJECT_ID",
asset_types=[resource_type]
)
actual_config = None
for asset in response:
if asset.name == resource_id:
actual_config = asset.resource
break
if not actual_config:
return {"status": "MISSING", "drift": expected_config}
# Compare fields
drifts = {}
for key, expected_value in expected_config.items():
actual_value = actual_config.get(key)
if actual_value != expected_value:
drifts[key] = {
"expected": expected_value,
"actual": actual_value
}
if drifts:
return {"status": "DRIFTED", "drifts": drifts}
else:
return {"status": "COMPLIANT"}IAM Policy Auditing
bash
# Find all service accounts with specific role
gcloud asset search-all-iam-policies \
--scope=organizations/ORG_ID \
--query='roles/serviceaccountUser'
# Find specific member's permissions
gcloud asset search-all-iam-policies \
--scope=organizations/ORG_ID \
--query='members:user@example.com'
# Export all IAM policies to BigQuery
gcloud asset export --project=EXPORT_PROJECT \
--bucket=gs://asset-export-bucket \
--asset-types=cloudresourcemanager.googleapis.com/ProjectHistorical Analysis via BigQuery
sql
-- Query historical asset data
-- (after exporting Cloud Asset Inventory to BigQuery)
SELECT
TIMESTAMP(DATE(timestamp)) as date,
asset_type,
COUNT(*) as resource_count
FROM `project.dataset.cloudresourcemanager_assets_YYYYMM`
WHERE asset_type = 'compute.googleapis.com/Instance'
GROUP BY date, asset_type
ORDER BY date DESC;
-- Detect newly created resources
SELECT
name,
display_name,
create_time
FROM `project.dataset.compute_instances_YYYYMM`
WHERE DATE(create_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
ORDER BY create_time DESC;
-- Find resources without required labels
SELECT
name,
display_name,
STRUCT(
JSON_EXTRACT_SCALAR(resource.data, '$.labels.environment') as environment,
JSON_EXTRACT_SCALAR(resource.data, '$.labels.team') as team
) as labels
FROM `project.dataset.compute_instances_YYYYMM`
WHERE DATE(timestamp) = CURRENT_DATE()
AND (
JSON_EXTRACT_SCALAR(resource.data, '$.labels.environment') IS NULL
OR JSON_EXTRACT_SCALAR(resource.data, '$.labels.team') IS NULL
);Security Command Center Integration
python
from google.cloud import securitycenter_v1
def create_compliance_finding(org_id, resource_name, issue):
"""Create Security Command Center finding from compliance check"""
client = securitycenter_v1.SecurityCenterClient()
parent = f"organizations/{org_id}"
source_name = f"{parent}/sources/FINDING_SOURCE_ID"
finding = securitycenter_v1.Finding(
state=securitycenter_v1.Finding.State.ACTIVE,
resource_name=resource_name,
finding_class=securitycenter_v1.Finding.FindingClass.MISCONFIGURATION,
severity=securitycenter_v1.Finding.Severity.HIGH,
source_properties={
"issue": issue,
"recommendation": "Fix configuration"
}
)
client.create_finding(
parent=source_name,
finding=finding
)Scalability Considerations
Large-scale queries
bash
# For large orgs, queries may take time
# Use filters to narrow scope
# ❌ Too broad
gcloud asset search-all-resources \
--scope=organizations/ORG_ID
# ✅ Narrower
gcloud asset search-all-resources \
--scope=organizations/ORG_ID \
--asset-types=compute.googleapis.com/Instance \
--query="location:us-central1"Export vs Query Trade-offs
Use Real-time Query API when:
- Need current state
- One-off queries
- < 100k resources
Use BigQuery Exports when:
- Historical analysis
- Recurring queries (cost-effective)
- Large-scale compliance reports
- Trend analysisCost Considerations
Cloud Asset Inventory:
- Real-time queries: Free (included)
- Exports: $1 per 1M resources per month
- BigQuery: Standard query pricing ($6.25 per TB)
Example org with 100k resources:
- Exports: $0.10/month
- Queries: 10 queries × $0.01 per query (est) = $0.10/month
- Total: ~$0.20/month (very cheap)Monitoring via Policy Analyzer
Policy Analyzer detect IAM policy misconfigurations:
python
def analyze_iam_policy(org_id):
"""Check for overly permissive IAM policies"""
from google.cloud import asset_v1
client = asset_v1.AssetServiceClient()
# Analyze IAM policy
analysis = client.analyze_iam_policy(
analysis_query=asset_v1.IamPolicyAnalysisQuery(
scope=f"organizations/{org_id}",
resource_selector=asset_v1.IamPolicyAnalysisQuery.ResourceSelector(
# Query all resources
),
access_selector=asset_v1.IamPolicyAnalysisQuery.AccessSelector(
permissions=["*"] # All permissions
),
identity_selector=asset_v1.IamPolicyAnalysisQuery.IdentitySelector(
identity="principalSet://goog/public:all" # Public access
)
)
)
# Review findings for risky public access
for result in analysis.analysis_results:
if result.fully_explored:
# Resource allows public access
print(f"⚠️ {result.resource} allows public access")Automation Patterns
python
# Continuous compliance checking
def schedule_compliance_audit():
"""Run compliance checks on schedule"""
from google.cloud import scheduler_v1
# Create Cloud Scheduler job
scheduler_client = scheduler_v1.CloudSchedulerClient()
parent = f"projects/PROJECT_ID/locations/us-central1"
job = scheduler_v1.Job(
name=f"{parent}/jobs/compliance-audit",
description="Daily compliance audit",
schedule="0 2 * * *", # 2 AM daily
time_zone="UTC",
http_target=scheduler_v1.HttpTarget(
uri="https://us-central1-PROJECT_ID.cloudfunctions.net/audit",
http_method=scheduler_v1.HttpMethod.POST
)
)
scheduler_client.create_job(parent=parent, job=job)
# Cloud Function for compliance audit
def compliance_audit(request):
"""Cloud Function triggered by scheduler"""
findings = []
findings.extend(audit_external_ips(ORG_ID))
findings.extend(audit_owner_role(ORG_ID))
if findings:
notify_security_team(findings)
return {"status": "completed", "findings": len(findings)}