Direct Control

This commit is contained in:
Aditya Pulipaka
2025-07-10 18:31:11 -05:00
parent 53fde4ba6a
commit 2549bc3b20
5 changed files with 336 additions and 83 deletions

245
index.js
View File

@@ -6,62 +6,162 @@ require('dotenv').config();
const jwt = require('jsonwebtoken');
const http = require('http');
const socketIo = require('socket.io');
const connectDB = require('./db'); // Your Mongoose DB connection
const { initializeAgenda } = require('./agenda'); // Agenda setup
const format = require('pg-format');
const app = express();
const port = 3000;
app.use(json());
const pool = new Pool();
const server = http.createServer(app);
const io = socketIo(server)
const io = socketIo(server, {
pingInterval: 15000,
pingTimeout: 10000,
});
let agenda;
(async () => {
// 1. Connect to MongoDB (for your application data)
await connectDB();
agenda = await initializeAgenda('mongodb://localhost:27017/myScheduledApp', pool, io);
})();
(async () => {
await pool.query("update user_tokens set connected=FALSE where connected=TRUE");
await pool.query("update device_tokens set connected=FALSE where connected=TRUE");
})();
const JWT_SECRET = process.env.JWT_SECRET;
const TOKEN_EXPIRY = '5d';
io.on('connection', (socket) => {
io.on('connection', async (socket) => {
console.log("periph connected");
socket.on("authenticate", async (data) => {
try {
const authData = JSON.parse(data);
const token = authData.token;
const periphId = authData.id;
const token = socket.handshake.auth.token ?? socket.handshake.headers['authorization']?.split(' ')[1];
try {
if (!token) throw new Error("no token!");
const payload = jwt.verify(token, JWT_SECRET);
let table;
let idCol;
let id;
if (payload.type === "user") {
table = "user_tokens";
idCol = "user_id";
socket.user = true;
id = payload.userId;
}
else if (payload.type === "peripheral") {
table = "device_tokens";
idCol = "device_id";
socket.user = false;
id = payload.peripheralId;
}
else {
throw new Error("Unauthorized");
}
const {rows} = await pool.query("select * from device_tokens where device_id=$1 and token=$2 and connected=FALSE",
[periphId, token]
);
const query = format(`update %I set connected=TRUE, socket=$1 where %I=$2 and token=$3 and connected=FALSE`,
table, idCol
);
if (rows.length != 1) {
const errorResponse = {
type: 'error',
code: 404,
message: 'Device not found'
};
socket.emit("error", errorResponse);
socket.disconnect(true);
}
else {
await pool.query("update device_tokens set connected=TRUE where device_id=$1 and token=$2 and connected=FALSE",
[periphId, token]
);
const successResponse = {
type: 'success',
code: 200,
message: 'Device found'
};
socket.emit("success", successResponse);
}
const result = await pool.query(query, [socket.id, id, token]);
} catch (error) {
console.error('Error during periph authentication:', error);
// Send an error message to the client
socket.emit('error', { type: 'error', code: 500 });
// Disconnect the client
if (result.rowCount != 1) {
const errorResponse = {
type: 'error',
code: 404,
message: 'Device not found or already connected'
};
socket.emit("error", errorResponse);
socket.disconnect(true);
}
})
})
else {
const successResponse = {
type: 'success',
code: 200,
message: 'Device found'
};
console.log("success");
socket.emit("success", successResponse);
}
} catch (error) {
console.error('Error during periph authentication:', error);
// Send an error message to the client
socket.emit('error', { type: 'error', code: 500 });
// Disconnect the client
socket.disconnect(true);
}
socket.on('calib_done', async (data) => {
console.log(`Received 'calib_done' event from client ${socket.id}:`);
console.log(data);
try {
const {rows} = await pool.query("select device_id from device_tokens where socket=$1 and connected=TRUE", [socket.id]);
if (rows.length != 1) throw new Error("No device with that ID connected to Socket.");
const result = await pool.query("update peripherals set await_calib=FALSE, calibrated=TRUE where device_id=$1 and peripheral_number=$2 returning id, user_id",
[rows[0].device_id, data.port]
);
if (result.rowCount != 1) throw new Error("No such peripheral in database");
const {rows: userRows} = await pool.query("select socket from user_tokens where user_id=$1", [result.rows[0].user_id]);
if (userRows.length == 1) {
if (userRows[0]){
const userSocket = userRows[0].socket;
io.to(userSocket).emit("calib_done", {periphID: result.rows[0].id});
}
}
else console.log("No App connected");
} catch (error) {
console.error(`Error processing job:`, error);
throw error;
}
});
socket.on('pos_hit', async (data) => {
console.log(`Received 'pos_hit' event from client ${socket.id}:`);
console.log(data);
const dateTime = new Date();
try {
const {rows} = await pool.query("select device_id from device_tokens where socket=$1 and connected=TRUE", [socket.id]);
if (rows.length != 1) throw new Error("No device with that ID connected to Socket.");
const result = await pool.query("update peripherals set last_pos=$1, last_set=$2 where device_id=$3 and peripheral_number=$4 returning id, user_id",
[data.pos, dateTime, rows[0].device_id, data.port]
);
if (result.rowCount != 1) throw new Error("No such peripheral in database");
const {rows: userRows} = await pool.query("select socket from user_tokens where user_id=$1", [result.rows[0].user_id]);
if (userRows.length == 1) {
if (userRows[0]){
const userSocket = userRows[0].socket;
io.to(userSocket).emit("pos_hit", {periphID: result.rows[0].id});
}
}
else console.log("No App connected");
} catch (error) {
console.error(`Error processing job:`, error);
throw error;
}
});
socket.on("disconnect", async () => {
if (socket.user) {
console.log("user disconnect");
await pool.query("update user_tokens set connected=FALSE where socket=$1", [socket.id]);
}
else {
console.log("device disconnect");
await pool.query("update device_tokens set connected=FALSE where socket=$1", [socket.id]);
}
});
});
async function createToken(userId) {
const token = jwt.sign({ type: 'user', userId }, JWT_SECRET, { expiresIn: TOKEN_EXPIRY });
@@ -99,7 +199,10 @@ async function authenticateToken(req, res, next) {
const {rows} = await pool.query("select device_id from device_tokens where token=$1", [token]);
if (rows.length != 1) throw new Error("Invalid/Expired Token");
req.peripheral = payload.peripheralId;
}
}
else {
throw new Error("Invalid/Expired Token");
}
next();
} catch {
return res.sendStatus(403); // invalid/expired token
@@ -107,7 +210,6 @@ async function authenticateToken(req, res, next) {
}
app.get('/', (req, res) => {
// console.log(req);
res.send('Hello World!');
});
@@ -255,16 +357,14 @@ app.get('/verify_device', authenticateToken, async (req, res) => {
app.get('/position', authenticateToken, async (req, res) => {
console.log("devicepos");
try {
const {rows} = await pool.query("select * from peripherals where device_id=$1", [req.peripheral]);
const {rows} = await pool.query("select last_pos, peripheral_number, await_calib from peripherals where device_id=$1",
[req.peripheral]);
if (rows.length == 0) {
return res.sendStatus(404);
}
const lastPosList = rows.map(row => row.last_pos);
const portNums = rows.map(row => row.peripheral_number);
const awaitCalibList = rows.map(row => row.await_calib);
const ready = rows.map(row => row.calibrated);
res.status(200).json({positions: lastPosList, port_nums: portNums, calib_needed: awaitCalibList, ready: ready});
res.status(200).json(rows);
} catch {
res.status(500).json({error: "server error"});
}
@@ -273,17 +373,20 @@ app.get('/position', authenticateToken, async (req, res) => {
app.post('/manual_position_update', authenticateToken, async (req, res) => {
console.log("setpos");
try {
const {periphId, newPos, time} = req.body;
const dateTime = new Date(time);
const result = await pool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4",
[newPos, dateTime, periphId, req.user]
);
if (result.rowCount === 0) return res.sendStatus(404);
const {periphId, periphNum, deviceId, newPos} = req.body;
const changedPosList = [{periphNum: periphNum, periphID: periphId, pos: newPos}];
res.sendStatus(204);
} catch {
res.status(500).json({error: "server error"});
// Schedule the job to run immediately
const job = await agenda.now('posChange', {deviceID: deviceId, changedPosList: changedPosList, userID: req.user});
res.status(202).json({ // 202 Accepted, as processing happens in background
success: true,
message: 'Request accepted for immediate processing.',
jobId: job.attrs._id
});
} catch (error) {
console.error('Error triggering immediate action:', error);
res.status(500).json({ success: false, message: 'Failed to trigger immediate action', error: error.message });
}
});
@@ -291,11 +394,16 @@ app.post('/calib', authenticateToken, async (req, res) => {
console.log("calibrate");
try {
const {periphId} = req.body;
const result = await pool.query("update peripherals set await_calib=true where id=$1 and user_id=$2",
[periphId, req.user]);
if (result.rowCount === 0) return res.sendStatus(404);
res.sendStatus(204);
} catch {
// Schedule the job to run immediately
const job = await agenda.now('calib', {periphID: periphId, userID: req.user});
res.status(202).json({ // 202 Accepted, as processing happens in background
success: true,
message: 'Request accepted for immediate processing.',
jobId: job.attrs._id
});
} catch (err) {
console.error('Error triggering immediate action:', err);
res.sendStatus(500);
}
})
@@ -304,10 +412,13 @@ app.post('/cancel_calib', authenticateToken, async (req, res) => {
console.log("cancelCalib");
try {
const {periphId} = req.body;
const result = await pool.query("update peripherals set await_calib=false where id=$1 and user_id=$2",
[periphId, req.user]);
if (result.rowCount === 0) return res.sendStatus(404);
res.sendStatus(204);
const job = await agenda.now('cancel_calib', {periphID: periphId, userID: req.user});
res.status(202).json({ // 202 Accepted, as processing happens in background
success: true,
message: 'Request accepted for immediate processing.',
jobId: job.attrs._id
});
} catch {
res.sendStatus(500);
}