---
name: zo-google-apis
description: |
  Reference skill for using Google APIs (Calendar, Gmail, Drive, Docs, Sheets, Contacts) via 
  direct OAuth credentials already stored on this machine. Covers authentication patterns, 
  common operations, and example code for each service. Use this when answering any question 
  that involves reading or writing Google services.
compatibility: Created for Zo Computer. Requires zo-google-direct-oauth to have been set up already.
metadata:
  author: rob.zo.computer
  category: Reference
  emoji: 🅶
---

# Google APIs Reference

Direct API access to Google Calendar, Gmail, Drive, Docs, Sheets, and Contacts — already authenticated.

## Prerequisites (already done)

- **OAuth setup**: Completed via `Skills/zo-google-direct-oauth`. Credentials and tokens live in `/home/.z/google-oauth/`.
- **Token refresh**: A background service (`google-oauth-refresh`) refreshes the access token every 4 hours.
- **Python libraries**: `google-api-python-client`, `google-auth`, `google-auth-httplib2` are installed.

## Auth Helper

All API access goes through the helper at `/home/.z/google-oauth/google_auth.py`.

```python
import sys
sys.path.insert(0, "/home/.z/google-oauth")
from google_auth import get_credentials, get_calendar_service, get_gmail_service
```

The helper auto-refreshes expired tokens. For services beyond Calendar/Gmail, build them manually:

```python
from google_auth import get_credentials
from googleapiclient.discovery import build

creds = get_credentials()
drive   = build("drive", "v3", credentials=creds)
sheets  = build("sheets", "v4", credentials=creds)
docs    = build("docs", "v1", credentials=creds)
people  = build("people", "v1", credentials=creds)
```

## Available Scopes

| Scope | Access |
|-------|--------|
| `calendar` | Full calendar read/write |
| `calendar.events` | Create/modify/delete events |
| `gmail.readonly` | Read emails |
| `gmail.send` | Send emails |
| `gmail.modify` | Modify emails (labels, archive, trash) |
| `drive` | Full Drive read/write |
| `drive.file` | Files created/opened by the app |
| `documents` | Read/write Google Docs |
| `spreadsheets` | Read/write Google Sheets |
| `contacts` | Read/write contacts |
| `contacts.readonly` | Read contacts |

## Quick Patterns

### Build any service

```python
import sys
sys.path.insert(0, "/home/.z/google-oauth")
from google_auth import get_credentials
from googleapiclient.discovery import build

creds = get_credentials()
service = build("<api>", "<version>", credentials=creds)
```

### Handle pagination

Most Google APIs paginate with `nextPageToken`:

```python
all_items = []
page_token = None
while True:
    result = service.someResource().list(pageToken=page_token, maxResults=100).execute()
    all_items.extend(result.get("items", []) or result.get("files", []))
    page_token = result.get("nextPageToken")
    if not page_token:
        break
```

### Batch requests

```python
from googleapiclient.http import BatchHttpRequest

batch = service.new_batch_http_request()
batch.add(service.events().get(calendarId='primary', eventId='abc'))
batch.add(service.events().get(calendarId='primary', eventId='def'))
batch.execute()
```

---

## Calendar API

**Service**: `build("calendar", "v3", credentials=creds)` or `get_calendar_service()`

### List upcoming events

```python
from datetime import datetime, timezone

service = get_calendar_service()
events = service.events().list(
    calendarId='primary',
    timeMin=datetime.now(timezone.utc).isoformat(),
    maxResults=20,
    singleEvents=True,
    orderBy='startTime',
).execute()

for e in events.get('items', []):
    start = e['start'].get('dateTime', e['start'].get('date'))
    print(f"{start} — {e['summary']}")
```

### Events in a date range

```python
events = service.events().list(
    calendarId='primary',
    timeMin='2026-02-10T00:00:00Z',
    timeMax='2026-02-17T00:00:00Z',
    singleEvents=True,
    orderBy='startTime',
).execute()
```

### Create an event

```python
event = service.events().insert(calendarId='primary', body={
    'summary': 'Team standup',
    'location': '100 Main St',
    'description': 'Weekly sync',
    'start': {'dateTime': '2026-02-17T10:00:00', 'timeZone': 'America/New_York'},
    'end':   {'dateTime': '2026-02-17T10:30:00', 'timeZone': 'America/New_York'},
    'attendees': [{'email': 'ben@substrate.run'}],
    'reminders': {'useDefault': False, 'overrides': [{'method': 'popup', 'minutes': 10}]},
}).execute()
```

### Quick add (natural language)

```python
service.events().quickAdd(
    calendarId='primary',
    text='Coffee with Alex tomorrow 3pm at Blue Bottle'
).execute()
```

### Update an event

```python
event = service.events().get(calendarId='primary', eventId='eventId123').execute()
event['summary'] = 'Updated title'
service.events().update(calendarId='primary', eventId='eventId123', body=event).execute()
```

### Delete an event

```python
service.events().delete(calendarId='primary', eventId='eventId123').execute()
```

### List all calendars

```python
cals = service.calendarList().list().execute()
for cal in cals['items']:
    print(f"{cal['id']} — {cal['summary']} (access: {cal.get('accessRole')})")
```

### Search events

```python
events = service.events().list(
    calendarId='primary',
    q='investor meeting',
    singleEvents=True,
    orderBy='startTime',
).execute()
```

---

## Gmail API

**Service**: `build("gmail", "v1", credentials=creds)` or `get_gmail_service()`

### List recent messages

```python
gmail = get_gmail_service()
results = gmail.users().messages().list(userId='me', maxResults=20).execute()
```

### Search messages

Gmail search uses the same query syntax as the Gmail UI:

```python
# Unread from a specific sender
results = gmail.users().messages().list(
    userId='me',
    q='from:someone@example.com is:unread',
    maxResults=20,
).execute()

# Date range
results = gmail.users().messages().list(
    userId='me',
    q='after:2026/02/10 before:2026/02/15 subject:Zo',
).execute()
```

### Read a message

```python
msg = gmail.users().messages().get(userId='me', id=msg_id, format='full').execute()

headers = {h['name']: h['value'] for h in msg['payload']['headers']}
print(headers['Subject'], headers['From'], headers['Date'])
print(msg['snippet'])
```

### Get full message body

```python
import base64

def get_body(payload):
    if payload.get('body', {}).get('data'):
        return base64.urlsafe_b64decode(payload['body']['data']).decode()
    for part in payload.get('parts', []):
        if part['mimeType'] == 'text/plain' and part.get('body', {}).get('data'):
            return base64.urlsafe_b64decode(part['body']['data']).decode()
        if part.get('parts'):
            result = get_body(part)
            if result:
                return result
    return ''

body = get_body(msg['payload'])
```

### Send an email

```python
import base64
from email.mime.text import MIMEText

message = MIMEText('Hello, this is the body.')
message['to'] = 'someone@example.com'
message['from'] = 'rob@substrate.run'
message['subject'] = 'Subject line'

raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
gmail.users().messages().send(userId='me', body={'raw': raw}).execute()
```

### Send HTML email

```python
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

msg = MIMEMultipart('alternative')
msg['to'] = 'someone@example.com'
msg['subject'] = 'HTML email'
msg.attach(MIMEText('Plain text fallback', 'plain'))
msg.attach(MIMEText('<h1>Hello</h1><p>HTML body</p>', 'html'))

raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail.users().messages().send(userId='me', body={'raw': raw}).execute()
```

### Reply to a thread

```python
reply = MIMEText('Thanks for following up.')
reply['to'] = 'sender@example.com'
reply['subject'] = 'Re: Original subject'
reply['In-Reply-To'] = original_message_id
reply['References'] = original_message_id

raw = base64.urlsafe_b64encode(reply.as_bytes()).decode()
gmail.users().messages().send(userId='me', body={'raw': raw, 'threadId': thread_id}).execute()
```

### Create a draft

```python
message = MIMEText('Draft body')
message['to'] = 'someone@example.com'
message['subject'] = 'Draft subject'
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()

gmail.users().drafts().create(userId='me', body={
    'message': {'raw': raw}
}).execute()
```

### Manage labels

```python
# List labels
labels = gmail.users().labels().list(userId='me').execute()

# Add/remove labels
gmail.users().messages().modify(userId='me', id=msg_id, body={
    'addLabelIds': ['STARRED'],
    'removeLabelIds': ['UNREAD'],
}).execute()
```

### Get threads

```python
thread = gmail.users().threads().get(userId='me', id=thread_id, format='full').execute()
for msg in thread['messages']:
    h = {hh['name']: hh['value'] for hh in msg['payload']['headers']}
    print(f"{h['From']}: {msg['snippet'][:100]}")
```

---

## Drive API

**Service**: `build("drive", "v3", credentials=creds)`

### List recent files

```python
drive = build("drive", "v3", credentials=creds)
results = drive.files().list(
    pageSize=20,
    orderBy='modifiedTime desc',
    fields='files(id,name,mimeType,modifiedTime,owners,webViewLink)',
).execute()
```

### Search files

```python
# By name
results = drive.files().list(q="name contains 'investor'", pageSize=20).execute()

# By type
results = drive.files().list(
    q="mimeType='application/vnd.google-apps.spreadsheet'",
    orderBy='modifiedTime desc',
).execute()

# In a specific folder
results = drive.files().list(q="'FOLDER_ID' in parents").execute()
```

### Common MIME types

| Type | MIME |
|------|------|
| Google Doc | `application/vnd.google-apps.document` |
| Google Sheet | `application/vnd.google-apps.spreadsheet` |
| Google Slides | `application/vnd.google-apps.presentation` |
| Folder | `application/vnd.google-apps.folder` |
| PDF | `application/pdf` |

### Download / export

```python
# Export Google Doc as plain text
from googleapiclient.http import MediaIoBaseDownload
import io

request = drive.files().export_media(fileId=file_id, mimeType='text/plain')
buf = io.BytesIO()
downloader = MediaIoBaseDownload(buf, request)
done = False
while not done:
    _, done = downloader.next_chunk()
text = buf.getvalue().decode()

# Export formats: text/plain, text/html, application/pdf,
# application/vnd.openxmlformats-officedocument.wordprocessingml.document
```

### Upload a file

```python
from googleapiclient.http import MediaFileUpload

media = MediaFileUpload('/path/to/file.pdf', mimetype='application/pdf')
file = drive.files().create(
    body={'name': 'file.pdf', 'parents': ['FOLDER_ID']},
    media_body=media,
).execute()
```

### Create a folder

```python
folder = drive.files().create(body={
    'name': 'New Folder',
    'mimeType': 'application/vnd.google-apps.folder',
}).execute()
```

---

## Sheets API

**Service**: `build("sheets", "v4", credentials=creds)`

### Read a range

```python
sheets = build("sheets", "v4", credentials=creds)
result = sheets.spreadsheets().values().get(
    spreadsheetId=SPREADSHEET_ID,
    range="Sheet1!A1:D10",
).execute()
rows = result.get('values', [])
```

### Read entire sheet

```python
result = sheets.spreadsheets().values().get(
    spreadsheetId=SPREADSHEET_ID,
    range="Sheet1",
).execute()
```

### Write cells

```python
sheets.spreadsheets().values().update(
    spreadsheetId=SPREADSHEET_ID,
    range="Sheet1!A1:C2",
    valueInputOption="USER_ENTERED",
    body={"values": [["Name", "Age", "City"], ["Rob", "30", "NYC"]]},
).execute()
```

### Append rows

```python
sheets.spreadsheets().values().append(
    spreadsheetId=SPREADSHEET_ID,
    range="Sheet1!A1",
    valueInputOption="USER_ENTERED",
    insertDataOption="INSERT_ROWS",
    body={"values": [["New", "Row", "Data"]]},
).execute()
```

### Get sheet metadata

```python
meta = sheets.spreadsheets().get(spreadsheetId=SPREADSHEET_ID).execute()
for sheet in meta['sheets']:
    print(sheet['properties']['title'], sheet['properties']['sheetId'])
```

### Clear a range

```python
sheets.spreadsheets().values().clear(
    spreadsheetId=SPREADSHEET_ID,
    range="Sheet1!A2:D100",
).execute()
```

### Batch update (formatting, add sheets, etc.)

```python
sheets.spreadsheets().batchUpdate(spreadsheetId=SPREADSHEET_ID, body={
    "requests": [
        {"addSheet": {"properties": {"title": "New Tab"}}},
        {"repeatCell": {
            "range": {"sheetId": 0, "startRowIndex": 0, "endRowIndex": 1},
            "cell": {"userEnteredFormat": {"textFormat": {"bold": True}}},
            "fields": "userEnteredFormat.textFormat.bold",
        }},
    ]
}).execute()
```

---

## Docs API

**Service**: `build("docs", "v1", credentials=creds)`

### Read a document

```python
docs = build("docs", "v1", credentials=creds)
doc = docs.documents().get(documentId=DOC_ID).execute()
print(doc['title'])

# Extract plain text
text = ''
for elem in doc['body']['content']:
    if 'paragraph' in elem:
        for run in elem['paragraph']['elements']:
            text += run.get('textRun', {}).get('content', '')
```

### Create a document

```python
doc = docs.documents().create(body={'title': 'New Document'}).execute()
doc_id = doc['documentId']
```

### Insert text

```python
docs.documents().batchUpdate(documentId=doc_id, body={
    'requests': [
        {'insertText': {'location': {'index': 1}, 'text': 'Hello, world!\n'}},
    ]
}).execute()
```

### Replace text

```python
docs.documents().batchUpdate(documentId=doc_id, body={
    'requests': [
        {'replaceAllText': {
            'containsText': {'text': '{{PLACEHOLDER}}', 'matchCase': True},
            'replaceText': 'Actual value',
        }},
    ]
}).execute()
```

---

## People (Contacts) API

**Service**: `build("people", "v1", credentials=creds)`

### List contacts

```python
people = build("people", "v1", credentials=creds)
results = people.people().connections().list(
    resourceName='people/me',
    pageSize=100,
    personFields='names,emailAddresses,phoneNumbers,organizations',
).execute()

for person in results.get('connections', []):
    names = person.get('names', [{}])
    emails = person.get('emailAddresses', [])
    name = names[0].get('displayName', '?') if names else '?'
    email = emails[0].get('value', '') if emails else ''
    print(f"{name} — {email}")
```

### Search contacts

```python
results = people.people().searchContacts(
    query='Rob',
    readMask='names,emailAddresses,phoneNumbers',
    pageSize=10,
).execute()
```

### Get "other contacts" (auto-saved from Gmail)

```python
results = people.otherContacts().list(
    pageSize=100,
    readMask='names,emailAddresses',
).execute()
```

---

## Error Handling

```python
from googleapiclient.errors import HttpError

try:
    result = service.events().list(calendarId='primary').execute()
except HttpError as e:
    if e.resp.status == 404:
        print("Not found")
    elif e.resp.status == 403:
        print("Forbidden — check scopes")
    elif e.resp.status == 429:
        print("Rate limited — back off")
        import time; time.sleep(5)
    else:
        raise
```

## Rate Limits (approximate)

| Service | Default | Per-user |
|---------|---------|----------|
| Calendar | 1M/day | 500/100s |
| Gmail | 250 units/s | varies by method |
| Drive | 20K/100s | 12K/100s |
| Sheets | 300/min read, 60/min write | per-spreadsheet |
| Docs | 300/min read, 60/min write | — |
| People | 90/min | — |

## Tips

- Always use `userId='me'` for Gmail (not an email address).
- Calendar `singleEvents=True` expands recurring events; without it you get the recurrence master.
- Sheets `valueInputOption="USER_ENTERED"` parses formulas and dates; `"RAW"` stores literally.
- Drive search queries use a [specific syntax](https://developers.google.com/drive/api/guides/search-files), not free text.
- For large reads, use `fields` parameter to limit response size (e.g., `fields='files(id,name)'`).
