rateLimit
This commit is contained in:
78
index.js
78
index.js
@@ -11,10 +11,41 @@ const { initializeAgenda } = require('./agenda'); // Agenda setup (CHANGE THIS F
|
||||
const format = require('pg-format');
|
||||
const cronParser = require('cron-parser');
|
||||
const { ObjectId } = require('mongodb');
|
||||
const { RateLimiterMemory } = require('rate-limiter-flexible');
|
||||
|
||||
// HTTP rate limiter: 5 requests per second per IP
|
||||
const httpRateLimiter = new RateLimiterMemory({
|
||||
points: 5,
|
||||
duration: 1,
|
||||
});
|
||||
|
||||
// WebSocket connection rate limiter: 1 connection per second per IP
|
||||
const wsConnectionRateLimiter = new RateLimiterMemory({
|
||||
points: 5,
|
||||
duration: 1,
|
||||
});
|
||||
|
||||
// WebSocket message rate limiter: 5 messages per second per socket
|
||||
const wsMessageRateLimiter = new RateLimiterMemory({
|
||||
points: 5,
|
||||
duration: 1,
|
||||
});
|
||||
|
||||
const app = express();
|
||||
const port = 3000;
|
||||
app.use(json());
|
||||
|
||||
// Rate limiting middleware for HTTP requests
|
||||
app.use(async (req, res, next) => {
|
||||
const ip = req.ip || req.connection.remoteAddress;
|
||||
try {
|
||||
await httpRateLimiter.consume(ip);
|
||||
next();
|
||||
} catch (rejRes) {
|
||||
res.status(429).json({ error: 'Too many requests' });
|
||||
}
|
||||
});
|
||||
|
||||
const pool = new Pool();
|
||||
const server = http.createServer(app);
|
||||
const io = socketIo(server, {
|
||||
@@ -38,8 +69,39 @@ let agenda;
|
||||
const JWT_SECRET = process.env.JWT_SECRET;
|
||||
const TOKEN_EXPIRY = '5d';
|
||||
|
||||
// Helper function to rate limit WebSocket messages
|
||||
async function rateLimitSocketMessage(socket, eventName) {
|
||||
try {
|
||||
await wsMessageRateLimiter.consume(socket.id);
|
||||
return true;
|
||||
} catch (rejRes) {
|
||||
socket.emit('error', {
|
||||
type: 'error',
|
||||
code: 429,
|
||||
message: 'Too many messages. Please slow down.',
|
||||
event: eventName
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
io.on('connection', async (socket) => {
|
||||
console.log("periph connected");
|
||||
|
||||
// Rate limit WebSocket connections by IP
|
||||
const ip = socket.handshake.address;
|
||||
try {
|
||||
await wsConnectionRateLimiter.consume(ip);
|
||||
} catch (rejRes) {
|
||||
socket.emit('error', {
|
||||
type: 'error',
|
||||
code: 429,
|
||||
message: 'Too many connection attempts. Please wait.'
|
||||
});
|
||||
socket.disconnect(true);
|
||||
return;
|
||||
}
|
||||
|
||||
const token = socket.handshake.auth.token ?? socket.handshake.headers['authorization']?.split(' ')[1];
|
||||
try {
|
||||
if (!token) throw new Error("no token!");
|
||||
@@ -144,6 +206,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// Device reports its calibration status after connection
|
||||
socket.on('report_calib_status', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'report_calib_status')) return;
|
||||
|
||||
console.log(`Device reporting calibration status: ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -165,6 +229,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// Device reports calibration error (motor stall, sensor failure, etc.)
|
||||
socket.on('device_calib_error', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'device_calib_error')) return;
|
||||
|
||||
console.log(`Device reporting calibration error: ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -193,6 +259,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// Device acknowledges ready for stage 1 (tilt up)
|
||||
socket.on('calib_stage1_ready', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'calib_stage1_ready')) return;
|
||||
|
||||
console.log(`Device ready for stage 1 (tilt up): ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -217,6 +285,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// Device acknowledges ready for stage 2 (tilt down)
|
||||
socket.on('calib_stage2_ready', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'calib_stage2_ready')) return;
|
||||
|
||||
console.log(`Device ready for stage 2 (tilt down): ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -241,6 +311,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// User confirms stage 1 complete (tilt up done)
|
||||
socket.on('user_stage1_complete', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'user_stage1_complete')) return;
|
||||
|
||||
console.log(`User confirms stage 1 complete: ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -269,6 +341,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// User confirms stage 2 complete (tilt down done)
|
||||
socket.on('user_stage2_complete', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'user_stage2_complete')) return;
|
||||
|
||||
console.log(`User confirms stage 2 complete: ${socket.id}`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -297,6 +371,8 @@ io.on('connection', async (socket) => {
|
||||
|
||||
// Device acknowledges calibration complete
|
||||
socket.on('calib_done', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'calib_done')) return;
|
||||
|
||||
console.log(`Received 'calib_done' event from client ${socket.id}:`);
|
||||
console.log(data);
|
||||
try {
|
||||
@@ -323,6 +399,8 @@ io.on('connection', async (socket) => {
|
||||
});
|
||||
|
||||
socket.on('pos_hit', async (data) => {
|
||||
if (!await rateLimitSocketMessage(socket, 'pos_hit')) return;
|
||||
|
||||
console.log(`Received 'pos_hit' event from client ${socket.id}:`);
|
||||
console.log(data);
|
||||
const dateTime = new Date();
|
||||
|
||||
Reference in New Issue
Block a user