From a3be098e5b03ddad89d22348f788f48b8b3bc812 Mon Sep 17 00:00:00 2001 From: pulipakaa24 Date: Mon, 5 Jan 2026 20:36:21 -0600 Subject: [PATCH] rateLimit --- index.js | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/index.js b/index.js index ab8de19..1af940c 100644 --- a/index.js +++ b/index.js @@ -11,10 +11,41 @@ const { initializeAgenda } = require('./agenda'); // Agenda setup (CHANGE THIS F const format = require('pg-format'); const cronParser = require('cron-parser'); const { ObjectId } = require('mongodb'); +const { RateLimiterMemory } = require('rate-limiter-flexible'); + +// HTTP rate limiter: 5 requests per second per IP +const httpRateLimiter = new RateLimiterMemory({ + points: 5, + duration: 1, +}); + +// WebSocket connection rate limiter: 1 connection per second per IP +const wsConnectionRateLimiter = new RateLimiterMemory({ + points: 5, + duration: 1, +}); + +// WebSocket message rate limiter: 5 messages per second per socket +const wsMessageRateLimiter = new RateLimiterMemory({ + points: 5, + duration: 1, +}); const app = express(); const port = 3000; app.use(json()); + +// Rate limiting middleware for HTTP requests +app.use(async (req, res, next) => { + const ip = req.ip || req.connection.remoteAddress; + try { + await httpRateLimiter.consume(ip); + next(); + } catch (rejRes) { + res.status(429).json({ error: 'Too many requests' }); + } +}); + const pool = new Pool(); const server = http.createServer(app); const io = socketIo(server, { @@ -38,8 +69,39 @@ let agenda; const JWT_SECRET = process.env.JWT_SECRET; const TOKEN_EXPIRY = '5d'; +// Helper function to rate limit WebSocket messages +async function rateLimitSocketMessage(socket, eventName) { + try { + await wsMessageRateLimiter.consume(socket.id); + return true; + } catch (rejRes) { + socket.emit('error', { + type: 'error', + code: 429, + message: 'Too many messages. Please slow down.', + event: eventName + }); + return false; + } +} + io.on('connection', async (socket) => { console.log("periph connected"); + + // Rate limit WebSocket connections by IP + const ip = socket.handshake.address; + try { + await wsConnectionRateLimiter.consume(ip); + } catch (rejRes) { + socket.emit('error', { + type: 'error', + code: 429, + message: 'Too many connection attempts. Please wait.' + }); + socket.disconnect(true); + return; + } + const token = socket.handshake.auth.token ?? socket.handshake.headers['authorization']?.split(' ')[1]; try { if (!token) throw new Error("no token!"); @@ -144,6 +206,8 @@ io.on('connection', async (socket) => { // Device reports its calibration status after connection socket.on('report_calib_status', async (data) => { + if (!await rateLimitSocketMessage(socket, 'report_calib_status')) return; + console.log(`Device reporting calibration status: ${socket.id}`); console.log(data); try { @@ -165,6 +229,8 @@ io.on('connection', async (socket) => { // Device reports calibration error (motor stall, sensor failure, etc.) socket.on('device_calib_error', async (data) => { + if (!await rateLimitSocketMessage(socket, 'device_calib_error')) return; + console.log(`Device reporting calibration error: ${socket.id}`); console.log(data); try { @@ -193,6 +259,8 @@ io.on('connection', async (socket) => { // Device acknowledges ready for stage 1 (tilt up) socket.on('calib_stage1_ready', async (data) => { + if (!await rateLimitSocketMessage(socket, 'calib_stage1_ready')) return; + console.log(`Device ready for stage 1 (tilt up): ${socket.id}`); console.log(data); try { @@ -217,6 +285,8 @@ io.on('connection', async (socket) => { // Device acknowledges ready for stage 2 (tilt down) socket.on('calib_stage2_ready', async (data) => { + if (!await rateLimitSocketMessage(socket, 'calib_stage2_ready')) return; + console.log(`Device ready for stage 2 (tilt down): ${socket.id}`); console.log(data); try { @@ -241,6 +311,8 @@ io.on('connection', async (socket) => { // User confirms stage 1 complete (tilt up done) socket.on('user_stage1_complete', async (data) => { + if (!await rateLimitSocketMessage(socket, 'user_stage1_complete')) return; + console.log(`User confirms stage 1 complete: ${socket.id}`); console.log(data); try { @@ -269,6 +341,8 @@ io.on('connection', async (socket) => { // User confirms stage 2 complete (tilt down done) socket.on('user_stage2_complete', async (data) => { + if (!await rateLimitSocketMessage(socket, 'user_stage2_complete')) return; + console.log(`User confirms stage 2 complete: ${socket.id}`); console.log(data); try { @@ -297,6 +371,8 @@ io.on('connection', async (socket) => { // Device acknowledges calibration complete socket.on('calib_done', async (data) => { + if (!await rateLimitSocketMessage(socket, 'calib_done')) return; + console.log(`Received 'calib_done' event from client ${socket.id}:`); console.log(data); try { @@ -323,6 +399,8 @@ io.on('connection', async (socket) => { }); socket.on('pos_hit', async (data) => { + if (!await rateLimitSocketMessage(socket, 'pos_hit')) return; + console.log(`Received 'pos_hit' event from client ${socket.id}:`); console.log(data); const dateTime = new Date();