Real-time Architecture
Technology Stack
Socket.IO
WebSocket communication for live updates and notifications
Liveblocks
Collaborative editing with presence and cursor tracking
Server-Sent Events
Streaming updates for long-running operations
WebRTC
Direct peer-to-peer communication for voice calls
Connection Architecture
Realtime Implementation
Publish Helper
// lib/realtime/publish.ts
import { channels } from '@repo/realtime';
import { publish } from '@repo/realtime/server';
export async function publishOrgUpdate(orgClerkId: string, payload: unknown) {
await publish(channels.org(orgClerkId), {
type: 'org.update',
payload,
});
}
Channel Authorization
// app/api/realtime/auth/route.ts
export async function POST(request: Request) {
const { userId, orgId } = await requireApiContext(request);
return authorizeRealtimeChannels({
userId,
channels: [channels.org(orgId)],
capability: 'subscribe',
});
}
Presence Tracking
User Presence System
// lib/presence-manager.ts
export class PresenceManager {
private userSessions = new Map<string, Set<string>>(); // userId -> socketIds
private userStatus = new Map<string, UserPresenceStatus>();
constructor(private io: SocketIOServer) {}
// Track user connection
addUserSession(userId: string, socketId: string, orgId: string) {
if (!this.userSessions.has(userId)) {
this.userSessions.set(userId, new Set());
}
this.userSessions.get(userId)!.add(socketId);
// Update user status
this.userStatus.set(userId, {
status: 'online',
lastSeen: new Date(),
currentLocation: null,
orgId
});
// Broadcast presence update
this.broadcastPresenceUpdate(orgId, userId, 'online');
}
// Remove user session
removeUserSession(userId: string, socketId: string, orgId: string) {
const sessions = this.userSessions.get(userId);
if (sessions) {
sessions.delete(socketId);
// If no more sessions, mark user as offline
if (sessions.size === 0) {
this.userSessions.delete(userId);
this.userStatus.set(userId, {
status: 'offline',
lastSeen: new Date(),
currentLocation: null,
orgId
});
this.broadcastPresenceUpdate(orgId, userId, 'offline');
}
}
}
// Update user location (current page/view)
updateUserLocation(userId: string, location: string, orgId: string) {
const status = this.userStatus.get(userId);
if (status) {
status.currentLocation = location;
this.userStatus.set(userId, status);
this.broadcastLocationUpdate(orgId, userId, location);
}
}
// Get organization presence
getOrganizationPresence(orgId: string): UserPresenceStatus[] {
return Array.from(this.userStatus.entries())
.filter(([_, status]) => status.orgId === orgId)
.map(([userId, status]) => ({
userId,
...status
}));
}
private broadcastPresenceUpdate(
orgId: string,
userId: string,
status: 'online' | 'offline'
) {
this.io.to(`org:${orgId}`).emit('presence:update', {
userId,
status,
timestamp: new Date().toISOString()
});
}
private broadcastLocationUpdate(
orgId: string,
userId: string,
location: string
) {
this.io.to(`org:${orgId}`).emit('presence:location', {
userId,
location,
timestamp: new Date().toISOString()
});
}
}
interface UserPresenceStatus {
status: 'online' | 'offline' | 'away';
lastSeen: Date;
currentLocation: string | null;
orgId: string;
}
Client-Side Presence
// hooks/usePresence.ts
import { useSocket } from './useSocket';
import { useAuth } from '@repo/auth';
export function usePresence() {
const socket = useSocket();
const { user, organization } = useAuth();
const [onlineUsers, setOnlineUsers] = useState<UserPresence[]>([]);
const [userLocations, setUserLocations] = useState<Record<string, string>>({});
useEffect(() => {
if (!socket || !user || !organization) return;
// Join presence system
socket.emit('presence:join', {
userId: user.id,
orgId: organization.id
});
// Listen for presence updates
socket.on('presence:update', (data) => {
setOnlineUsers(prev => {
if (data.status === 'online') {
return [...prev.filter(u => u.userId !== data.userId), {
userId: data.userId,
status: data.status,
lastSeen: data.timestamp
}];
} else {
return prev.filter(u => u.userId !== data.userId);
}
});
});
// Listen for location updates
socket.on('presence:location', (data) => {
setUserLocations(prev => ({
...prev,
[data.userId]: data.location
}));
});
return () => {
socket.emit('presence:leave');
socket.off('presence:update');
socket.off('presence:location');
};
}, [socket, user, organization]);
// Update current location
const updateLocation = useCallback((location: string) => {
if (socket && user && organization) {
socket.emit('presence:location_update', {
location,
userId: user.id,
orgId: organization.id
});
}
}, [socket, user, organization]);
return {
onlineUsers,
userLocations,
updateLocation
};
}
Live Data Updates
Real-time Data Broadcasting
// lib/realtime-broadcaster.ts
export class RealtimeBroadcaster {
constructor(private io: SocketIOServer) {}
// Broadcast deal updates
broadcastDealUpdate(
orgId: string,
dealId: string,
updates: Partial<Deal>,
userId: string
) {
this.io.to(`org:${orgId}`).emit('deal:updated', {
dealId,
updates,
updatedBy: userId,
timestamp: new Date().toISOString()
});
}
// Broadcast new contact creation
broadcastContactCreated(
orgId: string,
contact: Person,
userId: string
) {
this.io.to(`org:${orgId}`).emit('contact:created', {
contact,
createdBy: userId,
timestamp: new Date().toISOString()
});
}
// Broadcast communication events
broadcastCommunication(
orgId: string,
communication: Communication
) {
this.io.to(`org:${orgId}`).emit('communication:new', {
communication,
timestamp: new Date().toISOString()
});
}
// Broadcast task assignments
broadcastTaskAssignment(
orgId: string,
task: Task,
assignedToId: string,
assignedBy: string
) {
// Send to assigned user specifically
this.io.to(`user:${assignedToId}`).emit('task:assigned', {
task,
assignedBy,
timestamp: new Date().toISOString()
});
// Broadcast to organization
this.io.to(`org:${orgId}`).emit('task:created', {
task,
assignedBy,
timestamp: new Date().toISOString()
});
}
// Broadcast phone call events
broadcastCallEvent(
orgId: string,
callData: TwilioCall,
eventType: 'started' | 'ended' | 'missed'
) {
this.io.to(`org:${orgId}`).emit('call:event', {
callData,
eventType,
timestamp: new Date().toISOString()
});
}
}
Client-Side Data Synchronization
// hooks/useRealtimeData.ts
export function useRealtimeData<T>(
endpoint: string,
options?: {
pollInterval?: number;
realtimeEvents?: string[];
}
) {
const socket = useSocket();
const [data, setData] = useState<T | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
// Initial data fetch
const fetchData = useCallback(async () => {
try {
setLoading(true);
const response = await authFetch(endpoint);
const result = await response.json();
setData(result);
setError(null);
} catch (err) {
setError(err.message);
} finally {
setLoading(false);
}
}, [endpoint]);
// Setup real-time listeners
useEffect(() => {
if (!socket || !options?.realtimeEvents) return;
const handleUpdate = (updateData: unknown) => {
setData(prevData => {
if (Array.isArray(prevData)) {
// Handle array updates (lists)
return updateArrayData(prevData, updateData);
} else {
// Handle object updates
return { ...prevData, ...updateData };
}
});
};
// Listen to specified events
options.realtimeEvents.forEach(event => {
socket.on(event, handleUpdate);
});
return () => {
options.realtimeEvents?.forEach(event => {
socket.off(event, handleUpdate);
});
};
}, [socket, options?.realtimeEvents]);
// Initial fetch
useEffect(() => {
fetchData();
}, [fetchData]);
// Manual refresh
const refresh = useCallback(() => {
fetchData();
}, [fetchData]);
return { data, loading, error, refresh };
}
// Usage in components
export function DealsList() {
const { data: deals, loading, error } = useRealtimeData<Deal[]>(
'/api/deals',
{
realtimeEvents: ['deal:created', 'deal:updated', 'deal:deleted']
}
);
if (loading) return <LoadingSpinner />;
if (error) return <ErrorMessage error={error} />;
return (
<div>
{deals?.map(deal => (
<DealCard key={deal.id} deal={deal} />
))}
</div>
);
}
Collaborative Editing
Liveblocks Integration
// lib/liveblocks-config.ts
import { createClient } from '@liveblocks/client';
import { createRoomContext } from '@liveblocks/react';
const client = createClient({
authEndpoint: '/api/collaboration/auth',
throttle: 16, // 60 FPS
});
export const {
RoomProvider,
useRoom,
useMyPresence,
useOthers,
useBroadcastEvent,
useEventListener,
useStorage,
useMutation
} = createRoomContext(client);
// Authentication endpoint
export async function POST(req: Request) {
const { userId, orgId } = await auth();
if (!userId || !orgId) {
return new Response('Unauthorized', { status: 401 });
}
const { room } = await req.json();
// Verify user has access to this room
const hasAccess = await verifyRoomAccess(userId, orgId, room);
if (!hasAccess) {
return new Response('Forbidden', { status: 403 });
}
const session = liveblocks.prepareSession(userId, {
userInfo: {
name: user.firstName + ' ' + user.lastName,
avatar: user.imageUrl,
orgId: orgId
}
});
// Give access to the room
session.allow(room, session.FULL_ACCESS);
const { status, body } = await session.authorize();
return new Response(body, { status });
}
Collaborative Note Editing
// components/collaborative-note-editor.tsx
import { useStorage, useMutation, useOthers } from '@/lib/liveblocks-config';
import { Editor } from '@tiptap/react';
export function CollaborativeNoteEditor({ noteId }: { noteId: string }) {
const others = useOthers();
const content = useStorage((root) => root.notes[noteId]?.content);
const updateContent = useMutation(({ storage }, newContent: string) => {
const notes = storage.get('notes');
notes.set(noteId, {
...notes.get(noteId),
content: newContent,
updatedAt: new Date().toISOString()
});
}, [noteId]);
const editor = useEditor({
extensions: [
StarterKit,
Collaboration.configure({
document: ydoc,
}),
CollaborationCursor.configure({
provider: provider,
user: {
name: user.firstName + ' ' + user.lastName,
color: getUserColor(user.id),
},
}),
],
content: content || '',
onUpdate: ({ editor }) => {
updateContent(editor.getHTML());
},
});
return (
<div className="relative">
{/* Collaborative cursors */}
<div className="absolute top-2 right-2 flex -space-x-2">
{others.map(({ connectionId, presence, info }) => (
<Avatar key={connectionId} className="h-8 w-8 border-2 border-white">
<AvatarImage src={info.avatar} />
<AvatarFallback>
{info.name.split(' ').map(n => n[0]).join('')}
</AvatarFallback>
</Avatar>
))}
</div>
{/* Editor */}
<EditorContent editor={editor} className="min-h-[200px] p-4" />
{/* Live indicators */}
{others.length > 0 && (
<div className="text-sm text-gray-500 mt-2">
{others.length} other{others.length > 1 ? 's' : ''} editing
</div>
)}
</div>
);
}
Shared Presence Cursors
// components/presence-cursors.tsx
export function PresenceCursors() {
const [myPresence, updateMyPresence] = useMyPresence();
const others = useOthers();
const handlePointerMove = useCallback((e: React.PointerEvent) => {
const rect = e.currentTarget.getBoundingClientRect();
updateMyPresence({
cursor: {
x: e.clientX - rect.left,
y: e.clientY - rect.top,
}
});
}, [updateMyPresence]);
const handlePointerLeave = useCallback(() => {
updateMyPresence({ cursor: null });
}, [updateMyPresence]);
return (
<div
className="relative w-full h-full"
onPointerMove={handlePointerMove}
onPointerLeave={handlePointerLeave}
>
{/* Other users' cursors */}
{others.map(({ connectionId, presence, info }) => {
if (!presence.cursor) return null;
return (
<div
key={connectionId}
className="absolute pointer-events-none"
style={{
left: presence.cursor.x,
top: presence.cursor.y,
transform: 'translate(-50%, -50%)'
}}
>
<div
className="w-3 h-3 rounded-full border-2 border-white"
style={{ backgroundColor: info.color }}
/>
<div className="absolute top-4 left-2 px-2 py-1 text-xs font-medium text-white bg-black rounded whitespace-nowrap">
{info.name}
</div>
</div>
);
})}
</div>
);
}
Server-Sent Events
Streaming Long Operations
// api/deals/[id]/export/route.ts
export async function GET(
req: Request,
{ params }: { params: { id: string } }
) {
const { userId, orgId } = await auth();
const dealId = params.id;
// Create SSE stream
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
const sendEvent = (event: string, data: unknown) => {
controller.enqueue(encoder.encode(
`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
));
};
// Start export process
exportDealData(dealId, orgId, {
onProgress: (progress) => {
sendEvent('progress', { progress });
},
onComplete: (downloadUrl) => {
sendEvent('complete', { downloadUrl });
controller.close();
},
onError: (error) => {
sendEvent('error', { error: error.message });
controller.close();
}
});
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
Client-Side SSE Consumer
// hooks/useServerSentEvents.ts
export function useServerSentEvents(url: string, options?: EventSourceInit) {
const [events, setEvents] = useState<any[]>([]);
const [status, setStatus] = useState<'connecting' | 'connected' | 'error' | 'closed'>('connecting');
useEffect(() => {
const eventSource = new EventSource(url, options);
eventSource.onopen = () => {
setStatus('connected');
};
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, { type: 'message', data, timestamp: Date.now() }]);
};
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, { type: 'progress', data, timestamp: Date.now() }]);
});
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, { type: 'complete', data, timestamp: Date.now() }]);
eventSource.close();
setStatus('closed');
});
eventSource.addEventListener('error', (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, { type: 'error', data, timestamp: Date.now() }]);
eventSource.close();
setStatus('error');
});
eventSource.onerror = () => {
setStatus('error');
};
return () => {
eventSource.close();
};
}, [url]);
return { events, status };
}
Performance Optimization
Connection Pooling
// lib/connection-pool.ts
export class SocketConnectionPool {
private connections = new Map<string, Socket>();
private maxConnections = 1000;
addConnection(userId: string, socket: Socket) {
// Implement connection limits per organization
const orgConnections = Array.from(this.connections.values())
.filter(s => s.orgId === socket.orgId);
if (orgConnections.length >= this.maxConnections) {
// Close oldest connection
const oldestSocket = orgConnections
.sort((a, b) => a.connectedAt - b.connectedAt)[0];
oldestSocket.disconnect(true);
}
this.connections.set(socket.id, socket);
}
removeConnection(socketId: string) {
this.connections.delete(socketId);
}
getOrganizationConnections(orgId: string): Socket[] {
return Array.from(this.connections.values())
.filter(socket => socket.orgId === orgId);
}
}
Message Throttling
// lib/message-throttle.ts
export function createMessageThrottle(
socket: Socket,
maxMessages: number = 100,
timeWindow: number = 60000 // 1 minute
) {
const messageHistory: number[] = [];
return (event: string, handler: Function) => {
socket.on(event, (...args) => {
const now = Date.now();
// Clean old messages outside time window
while (messageHistory.length > 0 && messageHistory[0] < now - timeWindow) {
messageHistory.shift();
}
// Check rate limit
if (messageHistory.length >= maxMessages) {
socket.emit('rate_limit_exceeded', {
event,
limit: maxMessages,
window: timeWindow
});
return;
}
messageHistory.push(now);
handler(...args);
});
};
}
Next Steps
Communication Systems
Explore real-time communication features
AI Features
Learn about real-time AI processing
Performance Optimization
Optimize real-time performance and scaling
Real-time features are essential for modern CRM collaboration. This architecture ensures low latency, high availability, and seamless user experiences across all real estate workflows.