Skip to main content
Winnerr CRM provides comprehensive real-time functionality to enable seamless collaboration among real estate teams. Built on Socket.IO and Liveblocks, the system supports presence tracking, live updates, collaborative editing, and instant notifications.

Real-time Architecture

Technology Stack

Socket.IO

WebSocket communication for live updates and notifications

Liveblocks

Collaborative editing with presence and cursor tracking

Server-Sent Events

Streaming updates for long-running operations

WebRTC

Direct peer-to-peer communication for voice calls

Connection Architecture

Realtime Implementation

Publish Helper

// lib/realtime/publish.ts
import { channels } from '@repo/realtime';
import { publish } from '@repo/realtime/server';

export async function publishOrgUpdate(orgClerkId: string, payload: unknown) {
  await publish(channels.org(orgClerkId), {
    type: 'org.update',
    payload,
  });
}

Channel Authorization

// app/api/realtime/auth/route.ts
export async function POST(request: Request) {
  const { userId, orgId } = await requireApiContext(request);
  return authorizeRealtimeChannels({
    userId,
    channels: [channels.org(orgId)],
    capability: 'subscribe',
  });
}

Presence Tracking

User Presence System

// lib/presence-manager.ts
export class PresenceManager {
  private userSessions = new Map<string, Set<string>>(); // userId -> socketIds
  private userStatus = new Map<string, UserPresenceStatus>();
  
  constructor(private io: SocketIOServer) {}
  
  // Track user connection
  addUserSession(userId: string, socketId: string, orgId: string) {
    if (!this.userSessions.has(userId)) {
      this.userSessions.set(userId, new Set());
    }
    
    this.userSessions.get(userId)!.add(socketId);
    
    // Update user status
    this.userStatus.set(userId, {
      status: 'online',
      lastSeen: new Date(),
      currentLocation: null,
      orgId
    });
    
    // Broadcast presence update
    this.broadcastPresenceUpdate(orgId, userId, 'online');
  }
  
  // Remove user session
  removeUserSession(userId: string, socketId: string, orgId: string) {
    const sessions = this.userSessions.get(userId);
    if (sessions) {
      sessions.delete(socketId);
      
      // If no more sessions, mark user as offline
      if (sessions.size === 0) {
        this.userSessions.delete(userId);
        this.userStatus.set(userId, {
          status: 'offline',
          lastSeen: new Date(),
          currentLocation: null,
          orgId
        });
        
        this.broadcastPresenceUpdate(orgId, userId, 'offline');
      }
    }
  }
  
  // Update user location (current page/view)
  updateUserLocation(userId: string, location: string, orgId: string) {
    const status = this.userStatus.get(userId);
    if (status) {
      status.currentLocation = location;
      this.userStatus.set(userId, status);
      
      this.broadcastLocationUpdate(orgId, userId, location);
    }
  }
  
  // Get organization presence
  getOrganizationPresence(orgId: string): UserPresenceStatus[] {
    return Array.from(this.userStatus.entries())
      .filter(([_, status]) => status.orgId === orgId)
      .map(([userId, status]) => ({
        userId,
        ...status
      }));
  }
  
  private broadcastPresenceUpdate(
    orgId: string, 
    userId: string, 
    status: 'online' | 'offline'
  ) {
    this.io.to(`org:${orgId}`).emit('presence:update', {
      userId,
      status,
      timestamp: new Date().toISOString()
    });
  }
  
  private broadcastLocationUpdate(
    orgId: string, 
    userId: string, 
    location: string
  ) {
    this.io.to(`org:${orgId}`).emit('presence:location', {
      userId,
      location,
      timestamp: new Date().toISOString()
    });
  }
}

interface UserPresenceStatus {
  status: 'online' | 'offline' | 'away';
  lastSeen: Date;
  currentLocation: string | null;
  orgId: string;
}

Client-Side Presence

// hooks/usePresence.ts
import { useSocket } from './useSocket';
import { useAuth } from '@repo/auth';

export function usePresence() {
  const socket = useSocket();
  const { user, organization } = useAuth();
  const [onlineUsers, setOnlineUsers] = useState<UserPresence[]>([]);
  const [userLocations, setUserLocations] = useState<Record<string, string>>({});
  
  useEffect(() => {
    if (!socket || !user || !organization) return;
    
    // Join presence system
    socket.emit('presence:join', {
      userId: user.id,
      orgId: organization.id
    });
    
    // Listen for presence updates
    socket.on('presence:update', (data) => {
      setOnlineUsers(prev => {
        if (data.status === 'online') {
          return [...prev.filter(u => u.userId !== data.userId), {
            userId: data.userId,
            status: data.status,
            lastSeen: data.timestamp
          }];
        } else {
          return prev.filter(u => u.userId !== data.userId);
        }
      });
    });
    
    // Listen for location updates
    socket.on('presence:location', (data) => {
      setUserLocations(prev => ({
        ...prev,
        [data.userId]: data.location
      }));
    });
    
    return () => {
      socket.emit('presence:leave');
      socket.off('presence:update');
      socket.off('presence:location');
    };
  }, [socket, user, organization]);
  
  // Update current location
  const updateLocation = useCallback((location: string) => {
    if (socket && user && organization) {
      socket.emit('presence:location_update', {
        location,
        userId: user.id,
        orgId: organization.id
      });
    }
  }, [socket, user, organization]);
  
  return {
    onlineUsers,
    userLocations,
    updateLocation
  };
}

Live Data Updates

Real-time Data Broadcasting

// lib/realtime-broadcaster.ts
export class RealtimeBroadcaster {
  constructor(private io: SocketIOServer) {}
  
  // Broadcast deal updates
  broadcastDealUpdate(
    orgId: string, 
    dealId: string, 
    updates: Partial<Deal>, 
    userId: string
  ) {
    this.io.to(`org:${orgId}`).emit('deal:updated', {
      dealId,
      updates,
      updatedBy: userId,
      timestamp: new Date().toISOString()
    });
  }
  
  // Broadcast new contact creation
  broadcastContactCreated(
    orgId: string, 
    contact: Person, 
    userId: string
  ) {
    this.io.to(`org:${orgId}`).emit('contact:created', {
      contact,
      createdBy: userId,
      timestamp: new Date().toISOString()
    });
  }
  
  // Broadcast communication events
  broadcastCommunication(
    orgId: string, 
    communication: Communication
  ) {
    this.io.to(`org:${orgId}`).emit('communication:new', {
      communication,
      timestamp: new Date().toISOString()
    });
  }
  
  // Broadcast task assignments
  broadcastTaskAssignment(
    orgId: string, 
    task: Task, 
    assignedToId: string,
    assignedBy: string
  ) {
    // Send to assigned user specifically
    this.io.to(`user:${assignedToId}`).emit('task:assigned', {
      task,
      assignedBy,
      timestamp: new Date().toISOString()
    });
    
    // Broadcast to organization
    this.io.to(`org:${orgId}`).emit('task:created', {
      task,
      assignedBy,
      timestamp: new Date().toISOString()
    });
  }
  
  // Broadcast phone call events
  broadcastCallEvent(
    orgId: string, 
    callData: TwilioCall, 
    eventType: 'started' | 'ended' | 'missed'
  ) {
    this.io.to(`org:${orgId}`).emit('call:event', {
      callData,
      eventType,
      timestamp: new Date().toISOString()
    });
  }
}

Client-Side Data Synchronization

// hooks/useRealtimeData.ts
export function useRealtimeData<T>(
  endpoint: string,
  options?: {
    pollInterval?: number;
    realtimeEvents?: string[];
  }
) {
  const socket = useSocket();
  const [data, setData] = useState<T | null>(null);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState<string | null>(null);
  
  // Initial data fetch
  const fetchData = useCallback(async () => {
    try {
      setLoading(true);
      const response = await authFetch(endpoint);
      const result = await response.json();
      setData(result);
      setError(null);
    } catch (err) {
      setError(err.message);
    } finally {
      setLoading(false);
    }
  }, [endpoint]);
  
  // Setup real-time listeners
  useEffect(() => {
    if (!socket || !options?.realtimeEvents) return;
    
    const handleUpdate = (updateData: unknown) => {
      setData(prevData => {
        if (Array.isArray(prevData)) {
          // Handle array updates (lists)
          return updateArrayData(prevData, updateData);
        } else {
          // Handle object updates
          return { ...prevData, ...updateData };
        }
      });
    };
    
    // Listen to specified events
    options.realtimeEvents.forEach(event => {
      socket.on(event, handleUpdate);
    });
    
    return () => {
      options.realtimeEvents?.forEach(event => {
        socket.off(event, handleUpdate);
      });
    };
  }, [socket, options?.realtimeEvents]);
  
  // Initial fetch
  useEffect(() => {
    fetchData();
  }, [fetchData]);
  
  // Manual refresh
  const refresh = useCallback(() => {
    fetchData();
  }, [fetchData]);
  
  return { data, loading, error, refresh };
}

// Usage in components
export function DealsList() {
  const { data: deals, loading, error } = useRealtimeData<Deal[]>(
    '/api/deals',
    {
      realtimeEvents: ['deal:created', 'deal:updated', 'deal:deleted']
    }
  );
  
  if (loading) return <LoadingSpinner />;
  if (error) return <ErrorMessage error={error} />;
  
  return (
    <div>
      {deals?.map(deal => (
        <DealCard key={deal.id} deal={deal} />
      ))}
    </div>
  );
}

Collaborative Editing

Liveblocks Integration

// lib/liveblocks-config.ts
import { createClient } from '@liveblocks/client';
import { createRoomContext } from '@liveblocks/react';

const client = createClient({
  authEndpoint: '/api/collaboration/auth',
  throttle: 16, // 60 FPS
});

export const {
  RoomProvider,
  useRoom,
  useMyPresence,
  useOthers,
  useBroadcastEvent,
  useEventListener,
  useStorage,
  useMutation
} = createRoomContext(client);

// Authentication endpoint
export async function POST(req: Request) {
  const { userId, orgId } = await auth();
  
  if (!userId || !orgId) {
    return new Response('Unauthorized', { status: 401 });
  }
  
  const { room } = await req.json();
  
  // Verify user has access to this room
  const hasAccess = await verifyRoomAccess(userId, orgId, room);
  
  if (!hasAccess) {
    return new Response('Forbidden', { status: 403 });
  }
  
  const session = liveblocks.prepareSession(userId, {
    userInfo: {
      name: user.firstName + ' ' + user.lastName,
      avatar: user.imageUrl,
      orgId: orgId
    }
  });
  
  // Give access to the room
  session.allow(room, session.FULL_ACCESS);
  
  const { status, body } = await session.authorize();
  return new Response(body, { status });
}

Collaborative Note Editing

// components/collaborative-note-editor.tsx
import { useStorage, useMutation, useOthers } from '@/lib/liveblocks-config';
import { Editor } from '@tiptap/react';

export function CollaborativeNoteEditor({ noteId }: { noteId: string }) {
  const others = useOthers();
  const content = useStorage((root) => root.notes[noteId]?.content);
  
  const updateContent = useMutation(({ storage }, newContent: string) => {
    const notes = storage.get('notes');
    notes.set(noteId, {
      ...notes.get(noteId),
      content: newContent,
      updatedAt: new Date().toISOString()
    });
  }, [noteId]);
  
  const editor = useEditor({
    extensions: [
      StarterKit,
      Collaboration.configure({
        document: ydoc,
      }),
      CollaborationCursor.configure({
        provider: provider,
        user: {
          name: user.firstName + ' ' + user.lastName,
          color: getUserColor(user.id),
        },
      }),
    ],
    content: content || '',
    onUpdate: ({ editor }) => {
      updateContent(editor.getHTML());
    },
  });
  
  return (
    <div className="relative">
      {/* Collaborative cursors */}
      <div className="absolute top-2 right-2 flex -space-x-2">
        {others.map(({ connectionId, presence, info }) => (
          <Avatar key={connectionId} className="h-8 w-8 border-2 border-white">
            <AvatarImage src={info.avatar} />
            <AvatarFallback>
              {info.name.split(' ').map(n => n[0]).join('')}
            </AvatarFallback>
          </Avatar>
        ))}
      </div>
      
      {/* Editor */}
      <EditorContent editor={editor} className="min-h-[200px] p-4" />
      
      {/* Live indicators */}
      {others.length > 0 && (
        <div className="text-sm text-gray-500 mt-2">
          {others.length} other{others.length > 1 ? 's' : ''} editing
        </div>
      )}
    </div>
  );
}

Shared Presence Cursors

// components/presence-cursors.tsx
export function PresenceCursors() {
  const [myPresence, updateMyPresence] = useMyPresence();
  const others = useOthers();
  
  const handlePointerMove = useCallback((e: React.PointerEvent) => {
    const rect = e.currentTarget.getBoundingClientRect();
    updateMyPresence({
      cursor: {
        x: e.clientX - rect.left,
        y: e.clientY - rect.top,
      }
    });
  }, [updateMyPresence]);
  
  const handlePointerLeave = useCallback(() => {
    updateMyPresence({ cursor: null });
  }, [updateMyPresence]);
  
  return (
    <div
      className="relative w-full h-full"
      onPointerMove={handlePointerMove}
      onPointerLeave={handlePointerLeave}
    >
      {/* Other users' cursors */}
      {others.map(({ connectionId, presence, info }) => {
        if (!presence.cursor) return null;
        
        return (
          <div
            key={connectionId}
            className="absolute pointer-events-none"
            style={{
              left: presence.cursor.x,
              top: presence.cursor.y,
              transform: 'translate(-50%, -50%)'
            }}
          >
            <div
              className="w-3 h-3 rounded-full border-2 border-white"
              style={{ backgroundColor: info.color }}
            />
            <div className="absolute top-4 left-2 px-2 py-1 text-xs font-medium text-white bg-black rounded whitespace-nowrap">
              {info.name}
            </div>
          </div>
        );
      })}
    </div>
  );
}

Server-Sent Events

Streaming Long Operations

// api/deals/[id]/export/route.ts
export async function GET(
  req: Request,
  { params }: { params: { id: string } }
) {
  const { userId, orgId } = await auth();
  const dealId = params.id;
  
  // Create SSE stream
  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    start(controller) {
      const sendEvent = (event: string, data: unknown) => {
        controller.enqueue(encoder.encode(
          `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
        ));
      };
      
      // Start export process
      exportDealData(dealId, orgId, {
        onProgress: (progress) => {
          sendEvent('progress', { progress });
        },
        onComplete: (downloadUrl) => {
          sendEvent('complete', { downloadUrl });
          controller.close();
        },
        onError: (error) => {
          sendEvent('error', { error: error.message });
          controller.close();
        }
      });
    }
  });
  
  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

Client-Side SSE Consumer

// hooks/useServerSentEvents.ts
export function useServerSentEvents(url: string, options?: EventSourceInit) {
  const [events, setEvents] = useState<any[]>([]);
  const [status, setStatus] = useState<'connecting' | 'connected' | 'error' | 'closed'>('connecting');
  
  useEffect(() => {
    const eventSource = new EventSource(url, options);
    
    eventSource.onopen = () => {
      setStatus('connected');
    };
    
    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, { type: 'message', data, timestamp: Date.now() }]);
    };
    
    eventSource.addEventListener('progress', (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, { type: 'progress', data, timestamp: Date.now() }]);
    });
    
    eventSource.addEventListener('complete', (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, { type: 'complete', data, timestamp: Date.now() }]);
      eventSource.close();
      setStatus('closed');
    });
    
    eventSource.addEventListener('error', (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, { type: 'error', data, timestamp: Date.now() }]);
      eventSource.close();
      setStatus('error');
    });
    
    eventSource.onerror = () => {
      setStatus('error');
    };
    
    return () => {
      eventSource.close();
    };
  }, [url]);
  
  return { events, status };
}

Performance Optimization

Connection Pooling

// lib/connection-pool.ts
export class SocketConnectionPool {
  private connections = new Map<string, Socket>();
  private maxConnections = 1000;
  
  addConnection(userId: string, socket: Socket) {
    // Implement connection limits per organization
    const orgConnections = Array.from(this.connections.values())
      .filter(s => s.orgId === socket.orgId);
    
    if (orgConnections.length >= this.maxConnections) {
      // Close oldest connection
      const oldestSocket = orgConnections
        .sort((a, b) => a.connectedAt - b.connectedAt)[0];
      oldestSocket.disconnect(true);
    }
    
    this.connections.set(socket.id, socket);
  }
  
  removeConnection(socketId: string) {
    this.connections.delete(socketId);
  }
  
  getOrganizationConnections(orgId: string): Socket[] {
    return Array.from(this.connections.values())
      .filter(socket => socket.orgId === orgId);
  }
}

Message Throttling

// lib/message-throttle.ts
export function createMessageThrottle(
  socket: Socket,
  maxMessages: number = 100,
  timeWindow: number = 60000 // 1 minute
) {
  const messageHistory: number[] = [];
  
  return (event: string, handler: Function) => {
    socket.on(event, (...args) => {
      const now = Date.now();
      
      // Clean old messages outside time window
      while (messageHistory.length > 0 && messageHistory[0] < now - timeWindow) {
        messageHistory.shift();
      }
      
      // Check rate limit
      if (messageHistory.length >= maxMessages) {
        socket.emit('rate_limit_exceeded', {
          event,
          limit: maxMessages,
          window: timeWindow
        });
        return;
      }
      
      messageHistory.push(now);
      handler(...args);
    });
  };
}

Next Steps

Communication Systems

Explore real-time communication features

AI Features

Learn about real-time AI processing

Performance Optimization

Optimize real-time performance and scaling

Real-time features are essential for modern CRM collaboration. This architecture ensures low latency, high availability, and seamless user experiences across all real estate workflows.