Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 102 additions & 110 deletions defaultmodules/newsfeed/newsfeedfetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,71 @@ const FeedMe = require("feedme");
const iconv = require("iconv-lite");
const { htmlToText } = require("html-to-text");
const Log = require("logger");
const NodeHelper = require("node_helper");
const { getUserAgent } = require("#server_functions");
const { scheduleTimer } = require("#module_functions");
const HTTPFetcher = require("#http_fetcher");

/**
* Responsible for requesting an update on the set interval and broadcasting the data.
* @param {string} url URL of the news feed.
* @param {number} reloadInterval Reload interval in milliseconds.
* @param {string} encoding Encoding of the feed.
* @param {boolean} logFeedWarnings If true log warnings when there is an error parsing a news article.
* @param {boolean} useCorsProxy If true cors proxy is used for article url's.
* NewsfeedFetcher - Fetches and parses RSS/Atom feed data
* Uses HTTPFetcher for HTTP handling with intelligent error handling
* @class
*/
const NewsfeedFetcher = function (url, reloadInterval, encoding, logFeedWarnings, useCorsProxy) {
let reloadTimer = null;
let items = [];
let reloadIntervalMS = reloadInterval;
class NewsfeedFetcher {

let fetchFailedCallback = function () {};
let itemsReceivedCallback = function () {};
/**
* Creates a new NewsfeedFetcher instance
* @param {string} url - The URL of the news feed to fetch
* @param {number} reloadInterval - Time in ms between fetches
* @param {string} encoding - Encoding of the feed (e.g., 'UTF-8')
* @param {boolean} logFeedWarnings - If true log warnings when there is an error parsing a news article
* @param {boolean} useCorsProxy - If true cors proxy is used for article url's
*/
constructor (url, reloadInterval, encoding, logFeedWarnings, useCorsProxy) {
this.url = url;
this.encoding = encoding;
this.logFeedWarnings = logFeedWarnings;
this.useCorsProxy = useCorsProxy;
this.items = [];
this.fetchFailedCallback = () => {};
this.itemsReceivedCallback = () => {};

// Use HTTPFetcher for HTTP handling (Composition)
this.httpFetcher = new HTTPFetcher(url, {
reloadInterval: Math.max(reloadInterval, 1000),
headers: {
"Cache-Control": "max-age=0, no-cache, no-store, must-revalidate",
Pragma: "no-cache"
}
});

if (reloadIntervalMS < 1000) {
reloadIntervalMS = 1000;
// Wire up HTTPFetcher events
this.httpFetcher.on("response", (response) => this.#handleResponse(response));
this.httpFetcher.on("error", (errorInfo) => this.fetchFailedCallback(this, errorInfo));
}

/* private methods */

/**
* Request the new items.
* Creates a parse error info object
* @param {string} message - Error message
* @param {Error} error - Original error
* @returns {object} Error info object
*/
const fetchNews = () => {
clearTimeout(reloadTimer);
reloadTimer = null;
items = [];
#createParseError (message, error) {
return {
message,
status: null,
errorType: "PARSE_ERROR",
translationKey: "MODULE_ERROR_UNSPECIFIED",
retryAfter: this.httpFetcher.reloadInterval,
retryCount: 0,
url: this.url,
originalError: error
};
}

/**
* Handles successful HTTP response
* @param {Response} response - The fetch Response object
*/
#handleResponse (response) {
this.items = [];
const parser = new FeedMe();

parser.on("item", (item) => {
Expand All @@ -58,118 +88,80 @@ const NewsfeedFetcher = function (url, reloadInterval, encoding, logFeedWarnings
]
});

items.push({
title: title,
description: description,
pubdate: pubdate,
url: url,
useCorsProxy: useCorsProxy,
this.items.push({
title,
description,
pubdate,
url,
useCorsProxy: this.useCorsProxy,
hash: crypto.createHash("sha256").update(`${pubdate} :: ${title} :: ${url}`).digest("hex")
});
} else if (logFeedWarnings) {
} else if (this.logFeedWarnings) {
Log.warn("Can't parse feed item:", item);
Log.warn(`Title: ${title}`);
Log.warn(`Description: ${description}`);
Log.warn(`Pubdate: ${pubdate}`);
}
});

parser.on("end", () => {
this.broadcastItems();
});
parser.on("end", () => this.broadcastItems());

parser.on("error", (error) => {
fetchFailedCallback(this, error);
scheduleTimer(reloadTimer, reloadIntervalMS, fetchNews);
});

//"end" event is not broadcast if the feed is empty but "finish" is used for both
parser.on("finish", () => {
scheduleTimer(reloadTimer, reloadIntervalMS, fetchNews);
Log.error(`${this.url} - Feed parsing failed: ${error.message}`);
this.fetchFailedCallback(this, this.#createParseError(`Feed parsing failed: ${error.message}`, error));
});

parser.on("ttl", (minutes) => {
try {
// 86400000 = 24 hours is mentioned in the docs as maximum value:
const ttlms = Math.min(minutes * 60 * 1000, 86400000);
if (ttlms > reloadIntervalMS) {
reloadIntervalMS = ttlms;
Log.info(`reloadInterval set to ttl=${reloadIntervalMS} for url ${url}`);
}
} catch (error) {
Log.warn(`feed ttl is no valid integer=${minutes} for url ${url}`);
const ttlms = Math.min(minutes * 60 * 1000, 86400000);
if (ttlms > this.httpFetcher.reloadInterval) {
this.httpFetcher.reloadInterval = ttlms;
Log.info(`reloadInterval set to ttl=${ttlms} for url ${this.url}`);
}
});

const headers = {
"User-Agent": getUserAgent(),
"Cache-Control": "max-age=0, no-cache, no-store, must-revalidate",
Pragma: "no-cache"
};

fetch(url, { headers: headers })
.then(NodeHelper.checkFetchStatus)
.then((response) => {
let nodeStream;
if (response.body instanceof stream.Readable) {
nodeStream = response.body;
} else {
nodeStream = stream.Readable.fromWeb(response.body);
}
nodeStream.pipe(iconv.decodeStream(encoding)).pipe(parser);
})
.catch((error) => {
fetchFailedCallback(this, error);
scheduleTimer(reloadTimer, reloadIntervalMS, fetchNews);
});
};

/* public methods */
try {
const nodeStream = response.body instanceof stream.Readable
? response.body
: stream.Readable.fromWeb(response.body);
nodeStream.pipe(iconv.decodeStream(this.encoding)).pipe(parser);
} catch (error) {
Log.error(`${this.url} - Stream processing failed: ${error.message}`);
this.fetchFailedCallback(this, this.#createParseError(`Stream processing failed: ${error.message}`, error));
}
}

/**
* Update the reload interval, but only if we need to increase the speed.
* @param {number} interval Interval for the update in milliseconds.
* @param {number} interval - Interval for the update in milliseconds.
*/
this.setReloadInterval = function (interval) {
if (interval > 1000 && interval < reloadIntervalMS) {
reloadIntervalMS = interval;
setReloadInterval (interval) {
if (interval > 1000 && interval < this.httpFetcher.reloadInterval) {
this.httpFetcher.reloadInterval = interval;
}
};
}

/**
* Initiate fetchNews();
*/
this.startFetch = function () {
fetchNews();
};
startFetch () {
this.httpFetcher.startPeriodicFetch();
}

/**
* Broadcast the existing items.
*/
this.broadcastItems = function () {
if (items.length <= 0) {
broadcastItems () {
if (this.items.length <= 0) {
Log.info("No items to broadcast yet.");
return;
}
Log.info(`Broadcasting ${items.length} items.`);
itemsReceivedCallback(this);
};

this.onReceive = function (callback) {
itemsReceivedCallback = callback;
};

this.onError = function (callback) {
fetchFailedCallback = callback;
};

this.url = function () {
return url;
};

this.items = function () {
return items;
};
};
Log.info(`Broadcasting ${this.items.length} items.`);
this.itemsReceivedCallback(this);
}

/** @param {function(NewsfeedFetcher): void} callback - Called when items are received */
onReceive (callback) {
this.itemsReceivedCallback = callback;
}

/** @param {function(NewsfeedFetcher, object): void} callback - Called on fetch error */
onError (callback) {
this.fetchFailedCallback = callback;
}
}

module.exports = NewsfeedFetcher;
14 changes: 6 additions & 8 deletions defaultmodules/newsfeed/node_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ module.exports = NodeHelper.create({
const url = feed.url || "";
const encoding = feed.encoding || "UTF-8";
const reloadInterval = feed.reloadInterval || config.reloadInterval || 5 * 60 * 1000;
let useCorsProxy = feed.useCorsProxy;
if (useCorsProxy === undefined) useCorsProxy = true;
const useCorsProxy = feed.useCorsProxy ?? true;

try {
new URL(url);
Expand All @@ -46,11 +45,10 @@ module.exports = NodeHelper.create({
this.broadcastFeeds();
});

fetcher.onError((fetcher, error) => {
Log.error("Error: Could not fetch newsfeed: ", url, error);
let error_type = NodeHelper.checkFetchError(error);
fetcher.onError((fetcher, errorInfo) => {
Log.error("Error: Could not fetch newsfeed: ", fetcher.url, errorInfo.message || errorInfo);
this.sendSocketNotification("NEWSFEED_ERROR", {
error_type
error_type: errorInfo.translationKey
});
});

Expand All @@ -71,8 +69,8 @@ module.exports = NodeHelper.create({
*/
broadcastFeeds () {
const feeds = {};
for (let f in this.fetchers) {
feeds[f] = this.fetchers[f].items();
for (const url in this.fetchers) {
feeds[url] = this.fetchers[url].items;
}
this.sendSocketNotification("NEWS_ITEMS", feeds);
}
Expand Down
18 changes: 0 additions & 18 deletions js/module_functions.js

This file was deleted.

3 changes: 0 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
],
"type": "commonjs",
"imports": {
"#module_functions": {
"default": "./js/module_functions.js"
},
"#server_functions": {
"default": "./js/server_functions.js"
},
Expand Down