Skip to content

nodejs pub/sub #293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,665 changes: 1,665 additions & 0 deletions nodejs/nodejs-pub-sub/package-lock.json

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions nodejs/nodejs-pub-sub/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "nodejs_app-pub-sub",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@google-cloud/pubsub": "^4.0.7",
"express": "^4.18.2"
}
}
48 changes: 48 additions & 0 deletions nodejs/nodejs-pub-sub/src/controllers/emailController.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const {
listenForPullMessages,
listenForPushMessages,
} = require("../helper/pub-sub-config");
const subscriptionName = "email_subscription_pull";
const timeout = 60;

const welcome = (req, res) => {
return res.status(200).json({
success: true,
message: "Welcome to Email Service:)",
});
};

const pullEmail = async (req, res) => {
try {
await listenForPullMessages(subscriptionName, timeout);
return res.status(200).json({
success: true,
message: "Pull message received successfully :",
});
} catch (error) {
return res.status(500).json({
success: false,
message: "Couldn't receive pull message :(",
data: error.message,
});
}
};

const pushEmail = async (req, res) => {
try {
let messageResponse = await listenForPushMessages(req.body.message.data);
return res.status(200).json({
success: true,
message: "Push Message received successfully :)",
data: messageResponse,
});
} catch (error) {
return res.status(500).json({
success: false,
message: "Couldn't receive push message :(",
data: error,
});
}
};

module.exports = { welcome, pullEmail, pushEmail };
22 changes: 22 additions & 0 deletions nodejs/nodejs-pub-sub/src/controllers/userController.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const { publishMessage } = require("../helper/pub-sub-config");
const topicName = "user_creation";

const welcome = (req, res) => {
return res.status(200).json({
success: true,
message: "Welcome to User Profile Service:)",
});
};

const createUser = async (req, res) => {
console.log("hefd");
let userObj = req.body;
// create user profile logic goes here....

let messageId = await publishMessage(topicName, userObj);
return res.status(200).json({
success: true,
message: `Message ${messageId} published :)`,
});
};
module.exports = { welcome, createUser };
14 changes: 14 additions & 0 deletions nodejs/nodejs-pub-sub/src/email-sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const express = require("express");
const app = express();
const emailRoute = require("./routes/email");
const PORT = 5000;

app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use("/api/email", emailRoute);

app.listen(PORT, () => {
console.log(
`Email Notification Service is running at http://localhost:${PORT}`
);
});
13 changes: 13 additions & 0 deletions nodejs/nodejs-pub-sub/src/helper/nodejs-pub-sub.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "service_account",
"project_id": "nodejs-pub-sub-407703",
"private_key_id": "6a6d03017047fba913438f0d60b582a9a562d9ad",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC4gMe56Nf42dko\nNhzYqCEyt0gqULFb2g4EKIEoB+knzUTpi5fh+F+o168R8cFBQkg8Sc3cJBJgIBqf\nRYH9tj8T+qYD4oFVsCXobrIe4juydpx81O2RFjnmlm2e2MRseXIhA8ztOpGv1M0C\n68Mk30ec+Z8rylOA1mwLEXqMqByyYkmcX9bQjOmHpDp4Uds6Wy1SIy5FlHOd0/F2\nVQTfVxaSpzDehPoPYk/zjYqlCnPUaUp7wQWavxhj3RVkpo9EXM+OU9gpuQURvFd4\n6ShRpH+k5f3/ULETDQKYF3XQaU9LueVklN4y9G2ttLDxtuwBMysGv0YxSX6R+g6K\nAIuNRzcLAgMBAAECggEAB8hi5Gw2hAhIeUJDGD6LW/TQRfyuZpX9pl5BCHvA8Y1e\nnU1mpEESY/BpMaQVh1ew7V4doW4iGkzwYoyPjbC8225kx2AsHX23M6VLvTLE2uPW\n+QsQVGDwI1JwIFW7f2nmDw92ewFP+O9MkJV6X1lNz4jWFoZL3HDvEM51cW57+2hI\nn4qrGPqg+wP7HxbMRp4V8BtHAEbYhh1NGjj8MCEVFms9E5759UbiKoyp7nI2PN9t\nEROPt2z/MvLW3JmeKQ0zjhL0Bt0jroFcxT2ZQNuOxJEuwiS5wKVWYjNizxjCw9xc\nzuF7QclYMHqbdZqQdzFjpyrhu3+3FcV/0vwT225UBQKBgQDk1YyZxKEuFO7Zxokw\nP0GEirRJD0Um9PCDUlosZ8p5BeMq0ehd7kVKdrVbvnDavp3WSTxLytD6oRq9It3/\neq3Sw761WLun9WzI8WtuJPp2hBbn//cvnnSgn6f0Z8ySig/OdIrugbNJfAfKiI4v\nD0j/TuVeCn7LnhQzRM90Z+UM1wKBgQDOZ/kZnMbVdL/UQY6SfstEGidZj/aaxhMN\n4L4DkBw42CkNs7VT1FxGOR8SggUvau19sFqEqNUCZtj1FcyQ1kpRiyYkdC+8PxQ4\nHxgA+SX5APVM+SWQs1NAiZCz4PFjkaNcOmsQT10zxIQFjVF6D+KXdMHivs3LH41d\nqAIEQRnM7QKBgQCdwCWEH4wpk16xHG+YphLJh0EPmIpId9SSAySMtiRbV9apvzjc\nABUeZ0VQ8LEl6wAuKCB7814rWQJw3meB5pWL1UtUAs6i08rhn6Q6sx1CtH6CAu/p\nkOe/jzCeiSv63VhWL/tSvnDVOL66PXcfM/9TmZT/RmbwgZJVYZtOGQ1K0wKBgDYJ\nCDCJV5BoKRQjOXTmlsY0ZprO0ouAUVqvvG7oWksTY+P/aPgSPznvGFBbE4pvXs8p\nzPivzlv7ms7GrzA3uWsRl6GxfRG2Hc/3a+xNTYCbnJxTGV+BYskhem1s9STQ6shp\nTsANVuQHjVx6u5rN788gtOVseCm2D+c15ZlBhcSdAoGABUdfMN5Bk3cAgHNqANiX\nHmq5J1007fUiqO5J+OqwZGDotQc2peon9AUWyBDJN1v1OoTGOGtd2Aa067nWWxjO\nMaZyZchJMHjbQHsMZ6SXxcJeqZNyhJC2ErHcrOiLra5gRGCkOidOYLaocg07fMlS\nfp7GS61uvMnmWpl1Z1ng1go=\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "110098489186108810124",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/nodejs-app-pub-sub%40nodejs-pub-sub-407703.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
60 changes: 60 additions & 0 deletions nodejs/nodejs-pub-sub/src/helper/pub-sub-config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const { PubSub } = require("@google-cloud/pubsub");
const path = require("path");

const keyFilePath = path.join(__dirname, "nodejs-pub-sub.json");
const projectId = "nodejs-pub-sub";

// Create an instance of PubSub with the provided service account key
const pubSubClient = new PubSub({
keyFilename: keyFilePath,
});

const publishMessage = async (topicName, payload) => {
const dataBuffer = Buffer.from(JSON.stringify(payload));
try {
const messageId = await pubSubClient
.topic(topicName)
.publishMessage({ data: dataBuffer });
console.log(`Message ${messageId} published.`);
return messageId;
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
}
};

const listenForPullMessages = async (subscriptionName, timeout) => {
const subscription = pubSubClient.subscription(subscriptionName);
let messageCount = 0;
let data = [];
const messageHandler = message => {
const jsonData = JSON.parse(message.data);

data.push({
id: message.id,
attributes: message.attributes,
...jsonData,
});
messageCount += 1;
message.ack();
};
subscription.on("message", messageHandler);

setTimeout(() => {
console.log("Message Pulled: \n", data);
console.log(`${messageCount} message(s) received.`);
subscription.removeListener("message", messageHandler);
}, timeout * 100);
};

const listenForPushMessages = payload => {
const message = Buffer.from(payload, "base64").toString("utf-8");
let parsedMessage = JSON.parse(message);
console.log("Message Pushed: \n", parsedMessage);
return parsedMessage;
};

module.exports = {
publishMessage,
listenForPullMessages,
listenForPushMessages,
};
9 changes: 9 additions & 0 deletions nodejs/nodejs-pub-sub/src/routes/email.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const express = require("express");
const router = express();
const emailController = require("../controllers/emailController");

router.get("/", emailController.welcome);
router.post("/pull", emailController.pullEmail);
router.post("/push", emailController.pushEmail);

module.exports = router;
8 changes: 8 additions & 0 deletions nodejs/nodejs-pub-sub/src/routes/user.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const express = require("express");
const router = express();
const userController = require("../controllers/userController");

router.get("/", userController.welcome);
router.post("/create", userController.createUser);

module.exports = router;
13 changes: 13 additions & 0 deletions nodejs/nodejs-pub-sub/src/user-pub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const express = require("express");
const app = express();
const userRoute = require("./routes/user");
const PORT = 3000;

app.use(express.urlencoded({ extended: true }));
app.use(express.json());

app.use("/api/user", userRoute);

app.listen(PORT, () => {
console.log(`User service is running at http://localhost:${PORT}`);
});