One of the exciting things about local-first software is the potential to eliminate APIs and microservices. Instead of coding across the network, you code against a local store, data syncs in the background and your stack is suddenly much simpler.
But what if you don't want to eliminate your API? What if you want or need to keep it. How do you develop local-first software then?
With Electric, you can develop local-first apps incrementally, using your existing API.
The Toaster Project
There's a great book by Harvey Molotch called Where stuff comes from which talks about how nothing exists in isolation. One of his examples is a toaster.
At first glance, a toaster seems like a pretty straightforward, standalone product. However, look a bit closer and it integrates with a huge number of other things. Like sliced bread and all the supply chain behind it. It runs on electricity. Through a standard plug. It sits on a worktop. The spring in the lever that you press down to put the toast on is calibrated to match the strength of your arm.
Your API is a toaster. It doesn't exist in isolation. It's tied into other systems, like your monitoring systems and the way you do migrations and deployment. It's hard to just rip it out, because then you break these integrations and ergonomics — and obviate your own tooling and operational experience.
For example, REST APIs are stateless. We know how to scale them. We know how to debug them. They show up in the browser console. Swapping them out is all very well in theory, but what happens with your new system when it goes down in production?
Electric's approach
At Electric, our mission is to make sync and local-first adoptable for mainstream software. So, one of the main challenges we've focused on is how to use Electric with your existing software stack.
This is why we work with any data model in any standard Postgres. It's why we allow you to sync data into anything from a JavaScript object to a local database. And it's why we focus on providing composable primitives rather than a one-size-fits-all solution.
As a result, with Electric, you can develop local-first apps incrementally, using your existing API. So you can get the benefits of local-first, without having to re-engineer your stack or re-invent sliced bread, just to make toast in the morning.
How it works
First use Electric to sync data into your app. This allows your app to work with local data without it getting stale.
Then use your API to handle:
As well as, optionally, other concerns like:
Because Electric syncs data over HTTP, you can use existing middleware, integrations and instrumentation. Like authorization services and the browser console.
Electric sync
To build local-first you have to have the data locally. If you're doing that with data fetching then you have a stale data problem. Because if you're working with local data without keeping it in sync, then how do you know that it's not stale?
This is why you need data sync. To keep the local data fresh when it changes.
Happily, this is exactly what Electric does. It syncs data into local apps and services and keeps it fresh for you. Practically what does this look like? Well, instead of fetching data using web service calls, i.e.: something like this:
import React, { useState, useEffect } from 'react'
const MyComponent = () => {
const [items, setItems] = useState([])
useEffect(() => {
const fetchItems = async () => {
const response = await fetch('https://example.com/v1/api/items')
const data = await response.json()
setItems(data)
}
fetchItems()
}, [])
return (
<List items="items" />
)
}
Sync data using Electric, like this:
import { useShape } from '@electric-sql/react'
const MyComponent = () => {
const { data } = useShape({
url: `https://electric.example.com/v1/shape`,
params: {
table: 'items'
}
})
return (
<List items="data" />
)
}
For example:
- Trigger.dev started out with Electric by syncing status data from their background jobs platform into their Realtime dashboard
- Otto swapped out the way they loaded data into their AI spreadsheet
You can go much further with Electric, all the way to syncing into a local database. But you can do this incrementally as and when you need to.
Read-path
Electric only does the read-path sync. It syncs data out-of Postgres, into local apps.
Electric does not do write-path sync. It does not provide (or prescribe) a solution for getting data back into Postgres from local apps and services. In fact, it's explicitly designed for you to handle writes yourself.
HTTP
The other key thing about Electric sync is that it's just JSON over HTTP.
Because it's JSON you can parse it and work with it in any language and environment. Because it's HTTP you can proxy it. Which means you can use existing HTTP services and middleware to authorize access to it.
In fact, whatever you want to do to the replication stream — encrypt, filter, transform, split, remix, buffer, you name it — you can do through a proxy. Extensibility is built in at the protocol layer.
Using your existing API
So far, we've seen that Electric handles read-path sync and leaves writes up to you. We've seen how it syncs over HTTP and how this allows you to implement auth and other concerns like encryption and filtering using proxies.
Now, let's now dive in to these aspects and see exactly how to implement them using your existing API. With code samples and links to example apps.
Auth
Web-service based apps typically authorize access to resources in a controller or middleware layer. When switching to use a sync engine without an API, you cut out these layers and typically need to codify your auth logic as database rules.
For example in Firebase you have Security Rules that look like this:
service <<name>> {
// Match the resource path.
match <<path>> {
// Allow the request if the following conditions are true.
allow <<methods>> : if <<condition>>
}
}
In Postgres-based systems, like Supabase Realtime you use Postgres Row Level Security (RLS) rules, e.g.:
create policy "Individuals can view their own todos."
on todos for select
using ( (select auth.uid()) = user_id );
With Electric, you don't need to do this. Electric syncs over HTTP. You make HTTP requests to a Shape endpoint (see spec here) at:
GET /v1/shape
Because this is an HTTP resource, you can authorize access to it just as you would any other web service resource: using HTTP middleware. Route the request to Electric through an authorizing proxy that you control:
API proxy
You can see this pattern implemented in the Proxy auth example.
This defines a proxy that takes an HTTP request, reads the user credentials from an Authorization
header, uses them to authorize the request and if successful, proxies the request onto Electric:
export async function GET(request: Request) {
const url = new URL(request.url)
// Constuct the upstream URL
const originUrl = new URL(`http://localhost:3000/v1/shape`)
url.searchParams.forEach((value, key) => {
originUrl.searchParams.set(key, value)
})
// authentication and authorization
// Note: in a real-world authentication scheme, this is where you would
// veryify the authentication token and load the user. To keep this example simple,
// we're just passing directly through the org_id.
const org_id = request.headers.get(`authorization`)
let user
if (org_id) {
user = { org_id, isAdmin: org_id === `admin` }
}
// If the user isn't set, return 401
if (!user) {
return new Response(`authorization header not found`, { status: 401 })
}
// Only query orgs the user has access to.
if (!user.isAdmin) {
originUrl.searchParams.set(`where`, `"org_id" = ${user.org_id}`)
}
// When proxying long-polling requests, content-encoding & content-length are added
// erroneously (saying the body is gzipped when it's not) so we'll just remove
// them to avoid content decoding errors in the browser.
//
// Similar-ish problem to https://github.com/wintercg/fetch/issues/23
let resp = await fetch(originUrl.toString())
if (resp.headers.get(`content-encoding`)) {
const headers = new Headers(resp.headers)
headers.delete(`content-encoding`)
headers.delete(`content-length`)
resp = new Response(resp.body, {
status: resp.status,
statusText: resp.statusText,
headers,
})
}
return resp
}
You can run this kind of proxy as part of your existing backend API. Here's another example, this time using a Plug to authorize requests to a Phoenix application:
defmodule ApiWeb.Plugs.Auth.VerifyToken do
@moduledoc """
Verify that the auth token in the Authorization header matches the shape.
We do this by comparing the shape defined in the request query params with
the shape signed into the auth token claims.
So you can't proxy a shape request without having a signed token for
that exact shape definition.
"""
use ApiWeb, :plug
alias ApiWeb.Authenticator
def init(opts), do: opts
def call(%{assigns: %{shape: shape}, req_headers: headers} = conn, _opts) do
case Authenticator.authorize(shape, headers) do
{:error, message} when message in [:invalid, :missing] ->
conn
|> send_resp(401, "Unauthorized")
|> halt()
false ->
conn
|> send_resp(403, "Forbidden")
|> halt()
true ->
conn
end
end
end
Edge proxy
If you're running Electric behind a CDN, you're likely to want to deploy your authorizing proxy in front of the CDN. Otherwise routing requests through your API adds latency and can become a bottleneck. You can achieve this by deploying your proxy as an edge function or worker in front of the CDN, for example using Cloudflare Workers or Supabase Edge Functions.
Here's a Supabase edge function using Deno that verifies that the shape definition in a JWT matches the shape definition in the request params:
import jwt from 'jsonwebtoken'
const AUTH_SECRET = Deno.env.get("AUTH_SECRET") || "NFL5*0Bc#9U6E@tnmC&E7SUN6GwHfLmY"
const ELECTRIC_URL = Deno.env.get("ELECTRIC_URL") || "http://localhost:3000"
interface ShapeDefinition {
table: string
columns?: string
namespace?: string
where?: string
}
/**
* Match `GET /v1/shape` requests.
*/
function isGetShapeRequest(method: string, path: string) {
return method === 'GET' && path.endsWith('/v1/shape')
}
/**
* Allow requests with a valid JWT in the auth header.
*/
function verifyAuthHeader(headers: Headers) {
const auth_header = headers.get("Authorization")
if (auth_header === null) {
return [false, null]
}
const token = auth_header.split("Bearer ")[1]
try {
const claims = jwt.verify(token, AUTH_SECRET, {algorithms: ["HS256"]})
return [true, claims]
}
catch (err) {
console.warn(err)
return [false, null]
}
}
/**
* Allow requests where the signed `shape` definition in the JWT claims
* matches the shape definition in the request `params`.
*/
function matchesDefinition(shape: ShapeDefinition, params: URLSearchParams) {
if (shape === null || !shape.hasOwnProperty('table')) {
return false
}
const table = shape.namespace !== null
? `${shape.namespace}.${shape.table}`
: shape.table
if (table === null || table !== params.get('table')) {
return false
}
if (shape.where !== params.get('where')) {
return false
}
if (shape.columns !== params.get('columns')) {
return false
}
return true
}
// Handle requests to the server / edge function.
Deno.serve((req) => {
const url = new URL(req.url)
if (!isGetShapeRequest(req.method, url.pathname)) {
return new Response("Not found", {status: 404})
}
const [isValidJWT, claims] = verifyAuthHeader(req.headers)
if (!isValidJWT) {
return new Response("Unauthorized", {status: 401})
}
if (!matchesDefinition(claims.shape, url.searchParams)) {
return new Response("Forbidden", {status: 403})
}
// Reverse-proxy the request on to the Electric sync service.
return fetch(`${ELECTRIC_URL}/v1/shape${url.search}`, {headers: req.headers})
})
External services
You can also use external authorization services in your proxy.
For example, Authzed is a low-latency, distributed authorization service based on Google Zanzibar. You can use it in an edge proxy to authorize requests in front of a CDN, whilst still ensuring strong consistency for your authorization logic.
import jwt from 'jsonwebtoken'
import { v1 } from '@authzed/authzed-node'
const AUTH_SECRET = Deno.env.get("AUTH_SECRET") || "NFL5*0Bc#9U6E@tnmC&E7SUN6GwHfLmY"
const ELECTRIC_URL = Deno.env.get("ELECTRIC_URL") || "http://localhost:3000"
const HAS_PERMISSION = v1.CheckPermissionResponse_Permissionship.HAS_PERMISSION
function verifyAuthHeader(headers: Headers) {
const auth_header = headers.get("Authorization")
if (auth_header === null) {
return [false, null]
}
const token = auth_header.split("Bearer ")[1]
try {
const claims = jwt.verify(token, AUTH_SECRET, {algorithms: ["HS256"]})
return [true, claims]
}
catch (err) {
console.warn(err)
return [false, null]
}
}
Deno.serve(async (req) => {
const url = new URL(req.url)
const [isValidJWT, claims] = verifyAuthHeader(req.headers)
if (!isValidJWT) {
return new Response("Unauthorized", {status: 401})
}
// See https://github.com/authzed/authzed-node and
// https://authzed.com/docs/spicedb/getting-started/discovering-spicedb
const client = v1.NewClient(claims.token)
const resource = v1.ObjectReference.create({
objectType: `example/table`,
objectId: claims.table
})
const user = v1.ObjectReference.create({
objectType: "example/user",
objectId: claims.user_id
})
const subject = v1.SubjectReference.create({
object: user
})
const permissionRequest = v1.CheckPermissionRequest.create({
permission: 'read',
resource,
subject
})
const checkResult = await new Promise(
(resolve, reject) => {
client.checkPermission(
permissionRequest,
(err, response) => err ? reject(err) : resolve(response)
)
}
)
if (checkResult.permissionship !== HAS_PERMISSION) {
return new Response("Forbidden", {status: 403})
}
return fetch(`${ELECTRIC_URL}/v1/shape${url.search}`, {headers: req.headers})
})
Gatekeeper pattern
Another pattern, illustrated in our gatekeeper-auth example, is to:
- use an API endpoint to authorize shape access
- generate shape-scoped auth tokens
- validate these tokens in the proxy
This allows you to keep more of your auth logic in your API and minimise what's executed on the "hot path" of the proxy. This is actually what the code example shown in the edge proxy section above does, using an edge worker to validate a shape-scoped auth token.
You can also achieve the same thing using a standard reverse proxy like Caddy, Nginx or Varnish. For example, using Caddy:
{
order jwtauth before basicauth
}
:8080 {
jwtauth {
# You can sign and validate JWT tokens however you prefer. Here we
# expect tokens to have been signed with the `HS256` algorithm and
# a shared symmetric signing key to match the configuration in
# `../api/lib/api/token.ex`, so that this example config validates
# tokens generated by the example Api service.
#
# Note that the signing key should be base64 encoded:
#
# sign_key "<secret_key_bytes_in_base64_format>"
#
# See https://caddyserver.com/docs/modules/http.authentication.providers.jwt
sign_key {$AUTH_SECRET:"TkZMNSowQmMjOVU2RUB0bm1DJkU3U1VONkd3SGZMbVk="}
sign_alg HS256
# The jwtauth module requires a user claim but we don't actually use
# it here, so we just set it to the token issuer.
user_claims iss
# Extract the shape definition from the JWT `shape` claim and write
# into {http.auth.user.*} variables, so e.g.: the `shape.table`
# becomes {http.auth.user.table} and is used below to match against
# the request parameters.
meta_claims \
"shape.namespace -> namespace" \
"shape.table -> table" \
"shape.where -> where" \
"shape.columns -> columns"
}
# Match `GET /v1/shape` requests.
@get_shape {
method GET
path /v1/shape
}
# Match requests whose JWT shape definition matches the shape definition
# in the request parameters.
#
# So, for example, a claim of `{"shape": {"table": "items"}}` will match
# a query parameter of `?table=items`.
#
# Note that the first part of the expression matches the request table
# param against either the shape `table` or `namespace.table` depending
# on whether the shape `namespace` is empty or not.
@definition_matches {
expression <<CEL
(
{http.auth.user.namespace} == ""
? {http.auth.user.table} == {http.request.uri.query.table}
: {http.auth.user.namespace} + "." + {http.auth.user.table} == {http.request.uri.query.table}
)
&& {http.auth.user.where} == {http.request.uri.query.where}
&& {http.auth.user.columns} == {http.request.uri.query.columns}
CEL
}
# Route the request according to the matchers.
handle @get_shape {
handle @definition_matches {
reverse_proxy {$ELECTRIC_URL:"http://localhost:3000"}
}
respond "Forbidden" 403 {
close
}
}
respond "Not found" 404 {
close
}
}
The workflow from the client's point of view is to first hit the gatekeeper endpoint to generate a shape-scoped auth token, e.g.:
$ curl -sX POST "http://localhost:4000/gatekeeper/items" | jq
{
"headers": {
"Authorization": "Bearer <token>"
},
"url": "http://localhost:4000/proxy/v1/shape",
"table": "items"
}
Then use the token to authorize requests to Electic, via the proxy, e.g.:
$ curl -sv --header "Authorization: Bearer <token>" \
"http://localhost:4000/proxy/v1/shape?table=items&offset=-1"
...
< HTTP/1.1 200 OK
...
The Typescript client supports auth headers and 401
/ 403
error handling, so you can wrap this up using, e.g.:
import { FetchError, Shape, ShapeStream } from '@electric-sql/client'
const API_URL = process.env.API_URL || 'http://localhost:4000'
/*
* Makes a request to the gatekeeper endpoint to fetch a config object
* in the format expected by the ShapeStreamOptions including the
* proxy `url` to connect to and auth `headers`.
*/
async function fetchConfig() {
const url = `${API_URL}/gatekeeper/items`
const resp = await fetch(url, {method: 'POST'})
return await resp.json()
}
// Stream the shape through the proxy, using the url and auth headers
// provided by the gatekeeper.
const config = await fetchConfig()
const stream = new ShapeStream({
...config,
onError: async (error) => {
if (error instanceof FetchError) {
const status = error.status
console.log('handling fetch error: ', status)
// If the auth token is invalid or expires, hit the gatekeeper
// again to update the auth headers and thus keep streaming
// without interruption.
if (status === 401 || status === 403) {
return await fetchConfig()
}
}
throw error
}
})
// Materialize the stream into a `Shape` and subscibe to data changes
// so we can see the client working.
const shape = new Shape(stream)
shape.subscribe(({ rows }) => {
console.log('num rows: ', rows ? rows.length : 0)
})
Writes
Electric does read-path sync. That's the bit between Postgres and the client in the diagramme below. Electric does not handle writes. That's the dashed blue arrows around the outside, back from the client into Postgres:
Instead, Electric is designed for you to implement writes yourself. There's a comprehensive Writes guide and Write patterns example that walks through a range of approaches for this that integrate with your existing API.
You can also see a number of the examples that use an API for writes, including the Linearlite, Phoenix LiveView and Tanstack examples.
API server
To highlight a couple of the key patterns, let's look at the shared API server for the write-patterns example. It is an Express app that exposes the write methods of a REST API for a table of todos
:
POST {todo} /todos
to create a todoPUT {partial-todo} /todos/:id
to updateDELETE /todos/:id
to delete
import bodyParser from 'body-parser'
import cors from 'cors'
import express from 'express'
import pg from 'pg'
import { z } from 'zod'
// Connect to Postgres.
const DATABASE_URL =
process.env.DATABASE_URL ||
'postgresql://postgres:password@localhost:54321/electric'
const pool = new pg.Pool({ connectionString: DATABASE_URL })
// Expose an HTTP server.
const PORT = parseInt(process.env.PORT || '3001')
const app = express()
app.use(bodyParser.json())
app.use(cors())
// Validate user input
const idSchema = z.string().uuid()
const createSchema = z.object({
id: z.string().uuid(),
title: z.string(),
created_at: z.string(),
write_id: z.string().optional(),
})
const updateSchema = z.object({
completed: z.boolean(),
write_id: z.string().optional(),
})
// Define functions to create, update and delete todos
// using the `db` client.
const createTodo = async (id, title, created_at, write_id) => {
const sql = `
INSERT INTO todos (id, title, completed, created_at, write_id)
VALUES ($1, $2, false, $3, $4)
`
const params = [id, title, created_at, write_id || null]
await pool.query(sql, params)
}
const updateTodo = async (id, completed, write_id) => {
const sql = `
UPDATE todos SET completed = $1, write_id = $2
WHERE id = $3
`
const params = [completed ? '1' : '0', write_id || null, id]
await pool.query(sql, params)
}
const deleteTodo = async (id) => {
const sql = 'DELETE from todos where id = $1'
const params = [id]
await pool.query(sql, params)
}
// Expose the shared REST API to create, update and delete todos.
app.post('/todos', async (req, res) => {
let data
try {
data = createSchema.parse(req.body)
} catch (err) {
return res.status(400).json({ errors: err.errors })
}
try {
await createTodo(data.id, data.title, data.created_at, data.write_id)
} catch (err) {
return res.status(500).json({ errors: err })
}
return res.status(200).json({ status: 'OK' })
})
app.put('/todos/:id', async (req, res) => {
let id, data
try {
id = idSchema.parse(req.params.id)
data = updateSchema.parse(req.body)
} catch (err) {
return res.status(400).json({ errors: err.errors })
}
try {
await updateTodo(id, data.completed, data.write_id)
} catch (err) {
return res.status(500).json({ errors: err })
}
return res.status(200).json({ status: 'OK' })
})
app.delete('/todos/:id', async (req, res) => {
let id
try {
id = idSchema.parse(req.params.id)
} catch (err) {
return res.status(400).json({ errors: err.errors })
}
try {
await deleteTodo(id)
} catch (err) {
return res.status(500).json({ errors: err })
}
return res.status(200).json({ status: 'OK' })
})
// And expose a `POST /changes` route specifically to support the
// through the DB sync pattern.
const transactionsSchema = z.array(
z.object({
id: z.string(),
changes: z.array(
z.object({
operation: z.string(),
value: z.object({
id: z.string().uuid(),
title: z.string().optional(),
completed: z.boolean().optional(),
created_at: z.string().optional(),
}),
write_id: z.string(),
})
),
})
)
app.post('/changes', async (req, res) => {
let data
try {
data = transactionsSchema.parse(req.body)
} catch (err) {
return res.status(400).json({ errors: err.errors })
}
const client = await pool.connect()
try {
await client.query('BEGIN')
data.forEach((tx) => {
tx.changes.forEach(({ operation, value, write_id }) => {
switch (operation) {
case 'insert':
createTodo(value.id, value.title, value.created_at, write_id)
break
case 'update':
updateTodo(value.id, value.completed, write_id)
break
case 'delete':
deleteTodo(value.id)
break
}
})
})
await client.query('COMMIT')
} catch (err) {
await client.query('ROLLBACK')
return res.status(500).json({ errors: err })
} finally {
await client.release()
}
return res.status(200).json({ status: 'OK' })
})
// Start the server
app.listen(PORT, () => {
console.log(`Server listening at port ${PORT}`)
})
Optimistic writes
If you then look at the optimistic state pattern (one of the approaches illustrated in the write-patterns example) you can see this being used, together with Electric sync, to support instant, local, offline-capable writes:
import React, { useOptimistic, useTransition } from 'react'
import { v4 as uuidv4 } from 'uuid'
import { matchBy, matchStream } from '@electric-sql/experimental'
import { useShape } from '@electric-sql/react'
import api from '../../shared/app/client'
import { ELECTRIC_URL, envParams } from '../../shared/app/config'
type Todo = {
id: string
title: string
completed: boolean
created_at: Date
}
type PartialTodo = Partial<Todo> & {
id: string
}
type Write = {
operation: 'insert' | 'update' | 'delete'
value: PartialTodo
}
export default function OptimisticState() {
const [isPending, startTransition] = useTransition()
// Use Electric's `useShape` hook to sync data from Postgres
// into a React state variable.
//
// Note that we also unpack the `stream` from the useShape
// return value, so that we can monitor it below to detect
// local writes syncing back from the server.
const { isLoading, data, stream } = useShape<Todo>({
url: `${ELECTRIC_URL}/v1/shape`,
params: {
table: 'todos',
...envParams,
},
parser: {
timestamptz: (value: string) => new Date(value),
},
})
const sorted = data ? data.sort((a, b) => +a.created_at - +b.created_at) : []
// Use React's built in `useOptimistic` hook. This provides
// a mechanism to apply local optimistic state whilst writes
// are being sent-to and syncing-back-from the server.
const [todos, addOptimisticState] = useOptimistic(
sorted,
(synced: Todo[], { operation, value }: Write) => {
switch (operation) {
case 'insert':
return synced.some((todo) => todo.id === value.id)
? synced
: [...synced, value as Todo]
case 'update':
return synced.map((todo) =>
todo.id === value.id ? { ...todo, ...value } : todo
)
case 'delete':
return synced.filter((todo) => todo.id !== value.id)
}
}
)
// These are the same event handler functions from the online
// example, extended with `startTransition` -> `addOptimisticState`
// to apply local optimistic state.
//
// Note that the local state is applied:
//
// 1. whilst the HTTP request is being made to the API server; and
// 2. until the write syncs back through the Electric shape stream
//
// This is slightly different from most optimistic state examples
// because we wait for the sync as well as the api request.
async function createTodo(event: React.FormEvent) {
event.preventDefault()
const form = event.target as HTMLFormElement
const formData = new FormData(form)
const title = formData.get('todo') as string
const path = '/todos'
const data = {
id: uuidv4(),
title: title,
created_at: new Date(),
completed: false,
}
startTransition(async () => {
addOptimisticState({ operation: 'insert', value: data })
const fetchPromise = api.request(path, 'POST', data)
const syncPromise = matchStream(
stream,
['insert'],
matchBy('id', data.id)
)
await Promise.all([fetchPromise, syncPromise])
})
form.reset()
}
async function updateTodo(todo: Todo) {
const { id, completed } = todo
const path = `/todos/${id}`
const data = {
id,
completed: !completed,
}
startTransition(async () => {
addOptimisticState({ operation: 'update', value: data })
const fetchPromise = api.request(path, 'PUT', data)
const syncPromise = matchStream(stream, ['update'], matchBy('id', id))
await Promise.all([fetchPromise, syncPromise])
})
}
async function deleteTodo(event: React.MouseEvent, todo: Todo) {
event.preventDefault()
const { id } = todo
const path = `/todos/${id}`
startTransition(async () => {
addOptimisticState({ operation: 'delete', value: { id } })
const fetchPromise = api.request(path, 'DELETE')
const syncPromise = matchStream(stream, ['delete'], matchBy('id', id))
await Promise.all([fetchPromise, syncPromise])
})
}
if (isLoading) {
return <div className="loading">Loading …</div>
}
// The template below the heading is identical to the other patterns.
// prettier-ignore
return (
<div id="optimistic-state" className="example">
<h3>
<span className="title">
2. Optimistic state
</span>
<span className={isPending ? 'pending' : 'pending hidden'} />
</h3>
<ul>
{todos.map((todo) => (
<li key={todo.id}>
<label>
<input type="checkbox" checked={todo.completed}
onChange={() => updateTodo(todo)}
/>
<span className={`title ${ todo.completed ? 'completed' : '' }`}>
{ todo.title }
</span>
</label>
<a href="#delete" className="close"
onClick={(event) => deleteTodo(event, todo)}>
✕</a>
</li>
))}
{todos.length === 0 && (
<li>All done 🎉</li>
)}
</ul>
<form onSubmit={createTodo}>
<input type="text" name="todo"
placeholder="Type here …"
required
/>
<button type="submit">
Add
</button>
</form>
</div>
)
}
You can also see the shared persistent optimistic state pattern for a more resilient, comprehensive approach to building local-first apps with Electric on optimistic state.
Write-path sync
Another pattern covered in the Writes guide is through the database sync. This approach uses Electric to sync into an local, embedded database and then syncs changes made to the local database back to Postgres, via your API.
The example implementation uses Electric to sync into PGlite as the local embedded database. All the application code needs to do is read and write to the local database. The database schema takes care of everything else, including keeping a log of local changes to send to the server.
This is then processed by a sync utility that sends data to a:
POST {transactions} /changes
endpoint
Implemented in the shared API server shown above:
import { type Operation } from '@electric-sql/client'
import { type PGliteWithLive } from '@electric-sql/pglite/live'
import api from '../../shared/app/client'
type Change = {
id: number
operation: Operation
value: {
id: string
title?: string
completed?: boolean
created_at?: Date
}
write_id: string
transaction_id: string
}
type SendResult = 'accepted' | 'rejected' | 'retry'
/*
* Minimal, naive synchronization utility, just to illustrate the pattern of
* `listen`ing to `changes` and `POST`ing them to the api server.
*/
export default class ChangeLogSynchronizer {
#db: PGliteWithLive
#position: number
#hasChangedWhileProcessing: boolean = false
#shouldContinue: boolean = true
#status: 'idle' | 'processing' = 'idle'
#abortController?: AbortController
#unsubscribe?: () => Promise<void>
constructor(db: PGliteWithLive, position = 0) {
this.#db = db
this.#position = position
}
/*
* Start by listening for notifications.
*/
async start(): Promise<void> {
this.#abortController = new AbortController()
this.#unsubscribe = await this.#db.listen('changes', this.handle.bind(this))
this.process()
}
/*
* On notify, either kick off processing or note down that there were changes
* so we can process them straightaway on the next loop.
*/
async handle(): Promise<void> {
if (this.#status === 'processing') {
this.#hasChangedWhileProcessing = true
return
}
this.#status = 'processing'
this.process()
}
// Process the changes by fetching them and posting them to the server.
// If the changes are accepted then proceed, otherwise rollback or retry.
async process(): Promise<void> {
this.#hasChangedWhileProcessing = false
const { changes, position } = await this.query()
if (changes.length) {
const result: SendResult = await this.send(changes)
switch (result) {
case 'accepted':
await this.proceed(position)
break
case 'rejected':
await this.rollback()
break
case 'retry':
this.#hasChangedWhileProcessing = true
break
}
}
if (this.#hasChangedWhileProcessing && this.#shouldContinue) {
return await this.process()
}
this.#status = 'idle'
}
/*
* Fetch the current batch of changes
*/
async query(): Promise<{ changes: Change[]; position: number }> {
const { rows } = await this.#db.sql<Change>`
SELECT * from changes
WHERE id > ${this.#position}
ORDER BY id asc
`
const position = rows.length ? rows.at(-1)!.id : this.#position
return {
changes: rows,
position,
}
}
/*
* Send the current batch of changes to the server, grouped by transaction.
*/
async send(changes: Change[]): Promise<SendResult> {
const path = '/changes'
const groups = Object.groupBy(changes, (x) => x.transaction_id)
const sorted = Object.entries(groups).sort((a, b) =>
a[0].localeCompare(b[0])
)
const transactions = sorted.map(([transaction_id, changes]) => {
return {
id: transaction_id,
changes: changes,
}
})
const signal = this.#abortController?.signal
let response: Response | undefined
try {
response = await api.request(path, 'POST', transactions, signal)
} catch (_err) {
return 'retry'
}
if (response === undefined) {
return 'retry'
}
if (response.ok) {
return 'accepted'
}
return response.status < 500 ? 'rejected' : 'retry'
}
/*
* Proceed by clearing the processed changes and moving the position forward.
*/
async proceed(position: number): Promise<void> {
await this.#db.sql`
DELETE from changes
WHERE id <= ${position}
`
this.#position = position
}
/*
* Rollback with an extremely naive strategy: if any write is rejected, simply
* wipe the entire local state.
*/
async rollback(): Promise<void> {
await this.#db.transaction(async (tx) => {
await tx.sql`DELETE from changes`
await tx.sql`DELETE from todos_local`
})
}
/*
* Stop synchronizing
*/
async stop(): Promise<void> {
this.#shouldContinue = false
if (this.#abortController !== undefined) {
this.#abortController.abort()
}
if (this.#unsubscribe !== undefined) {
await this.#unsubscribe()
}
}
}
Authorizing writes
Just as with reads, because you're sending writes to an API endpoint, you can use your API, middleware, or a proxy to authorize them. Just as you would any other API request.
Agaim, to emphasise, this allows you to develop local-first apps, without having to codify write-path authorization logic into database rules. In fact, in many cases, you can just keep your existing API endpoints and you may not need to change any code at all.
Encryption
Electric syncs ciphertext as well as it syncs plaintext. You can encrypt data on and off the local client, i.e.:
- encrypt it before it leaves the client
- decrypt it when it comes into the client from the replication stream
You can see an example of this in the encryption example:
import base64 from 'base64-js'
import React, { useEffect, useState } from 'react'
import { useShape } from '@electric-sql/react'
import './Example.css'
type Item = {
id: string
title: string
}
type EncryptedItem = {
id: string
ciphertext: string
iv: string
}
const API_URL = import.meta.env.API_URL || 'http://localhost:3001'
const ELECTRIC_URL = import.meta.env.ELECTRIC_URL ?? 'http://localhost:3000'
// For this example, we hardcode a deterministic key that works across page loads.
// In a real app, you would implement a key management strategy. Electric is great
// at syncing keys between users :)
const rawKey = new Uint8Array(16)
const key = await crypto.subtle.importKey('raw', rawKey, 'AES-GCM', true, [
'encrypt',
'decrypt',
])
/*
* Encrypt an `Item` into an `EncryptedItem`.
*/
async function encrypt(item: Item): Promise<EncryptedItem> {
const { id, title } = item
const enc = new TextEncoder()
const encoded = enc.encode(title)
const iv = crypto.getRandomValues(new Uint8Array(12))
const encrypted = await crypto.subtle.encrypt(
{
iv,
name: 'AES-GCM',
},
key,
encoded
)
const ciphertext = base64.fromByteArray(new Uint8Array(encrypted))
const iv_str = base64.fromByteArray(iv)
return {
id,
ciphertext,
iv: iv_str,
}
}
/*
* Decrypt an `EncryptedItem` to an `Item`.
*/
async function decrypt(item: EncryptedItem): Promise<Item> {
const { id, ciphertext, iv: iv_str } = item
const encrypted = base64.toByteArray(ciphertext)
const iv = base64.toByteArray(iv_str)
const decrypted = await crypto.subtle.decrypt(
{
iv,
name: 'AES-GCM',
},
key,
encrypted
)
const dec = new TextDecoder()
const title = dec.decode(decrypted)
return {
id,
title,
}
}
export const Example = () => {
const [items, setItems] = useState<Item[]>()
const { data } = useShape<EncryptedItem>({
url: `${ELECTRIC_URL}/v1/shape`,
params: {
table: 'items',
},
})
const rows = data !== undefined ? data : []
// There are more efficient ways of updating state than always decrypting
// all the items on any change but just to demonstate the decryption ...
useEffect(() => {
async function init() {
const items = await Promise.all(
rows.map(async (row) => await decrypt(row))
)
setItems(items)
}
init()
}, [rows])
/*
* Handle adding an item by creating the item data, encrypting it
* and sending it to the API
*/
async function createItem(event: React.FormEvent) {
event.preventDefault()
const form = event.target as HTMLFormElement
const formData = new FormData(form)
const title = formData.get('title') as string
const id = crypto.randomUUID()
const item = {
id,
title,
}
const data = await encrypt(item)
const url = `${API_URL}/items`
const options = {
method: 'POST',
body: JSON.stringify(data),
headers: {
'Content-Type': 'application/json',
},
}
await fetch(url, options)
form.reset()
}
if (items === undefined) {
return <div>Loading...</div>
}
return (
<div>
<div>
{items.map((item: Item, index: number) => (
<p key={index} className="item">
<code>{item.title}</code>
</p>
))}
</div>
<form onSubmit={createItem}>
<input
type="text"
name="title"
placeholder="Type here …"
required
/>
<button type="submit">Add</button>
</form>
</div>
)
}
Key management
One of the challenges with encryption is key management. I.e.: choosing which data to encrypt with which keys and sharing the right keys with the right users.
There are some good patterns here like using a key per resource, such as a tenant, workspace or group. You can then encrypt data within that resource using a specific key and share the key with user when they get access to the resource (e.g.: when added to the group).
Electric is good at syncing keys. For example, you could define a shape like:
const stream = new ShapeStream({
url: `${ELECTRIC_URL}/v1/shape`,
params: {
table: 'tenants',
columns: [
'keys'
],
where: `id in ('${user.tenant_ids.join(`', '`)}')`
}
})
Either in your client or in your proxy. You could then put a denormalised tenant_id
column on all of your rows and lookup the correct key to use when decrypting and encrypting the row.
Filtering
The HTTP API streams a log of change operations. You can intercept this at any level -- in your API, in a middleware proxy or when handling or materialising the log from a ShapeStream instance in the client.
Using your existing tools
Because Electric syncs over HTTP, it integrates with standard debugging, visibility and monitoring tools.
Monitoring
You can see Electric requests in your standard HTTP logs. You can catch errors and send them with request-specific context to systems like Sentry and AppSignal.
You can debug on the command line using curl
.
Browser console
One of the most aspects of this is being able to see and easily introspect sync requests in the browser console. This allows you to see what data is being sent through when and also allows you to observe caching and and offline behaviour.
You don't need to implement custom tooling to get visibility in what's happening with Electric. It's not a black box when it comes to debugging in development and in production.
Next steps
This post has outlined how you can develop local-first software incrementally, using your existing API alongside Electric for read-path sync.
To learn more and get started with Electric, see the Quickstart, Documentation and source code on GitHub: