var _ = require('lodash'), util = require('util'), Bluebird = require('bluebird'), EventEmitter = require('events').EventEmitter, Counts = require('./counts').Counts, RollingCounts = require('./counts').RollingCounts; var defaults = { /** * Bluebird Promise * @constructor * @external Promise * @see {@link https://github.com/petkaantonov/bluebird/blob/master/API.md#core} */ Promise: Bluebird, name: "CircuitBreaker", isErrorHandler: isError, concurrency: 0, timeout: 3000, resetTime: 1000, volumeThreshold: 10, intervalSize: 1000, windowSize: 1000, windowCount: 10, errorThreshold: 0.05, errorNamesThresholds: {}, emitIntervalEvent: true, emitCallbackEvent: false }; function isError(error) { return error; } /** * Create a new circuit breaker wrapping function *fn*. If the function * is a method of an object, pass the object as the *fnThis* parameter. * * You can configure it's behaviour with the *options* parameter, or through * various setters: * * @example * var cb = new CircuitBreaker(fn) * .setTimeout(1000) * .setErrorThreshold(0.5); * * @fires CircuitBreaker#interval * @fires CircuitBreaker#callback * * @constructor * @param {function} fn - The function to be executed * @param {object=} fnThis - If provided, call fn with fnThis as this argument * @param {object=} options - Options. All times are in milliseconds. * @param {function} options.Promise - Promise library (default: require('bluebird') / you can use the navite Promise constructor) * @param {string} options.name - Name for this instance (default: 'CircuitBreaker') * @param {function} options.isErrorHandler - Function to get error from callback * @param {integer} options.concurrency - Set a limit for concurrent calls (default: 0 - unlimited) * @param {integer} options.timeout - Timeout limit for callback to be called (default: 3000) * @param {integer} options.resetTime - When circuit is tripped, time to wait before a test call is allowed (default: 1000) * @param {integer} options.volumeThreshold - Minimum number of calls in health window before circuit is checked * for health. Circuit remains <b>closed</b> until then (default: 10) * @param {integer} options.intervalSize - Frequency with which the 'interval' event is emitted for reporting (default: 1000) * @param {integer} options.windowSize - Time size of each individual rolling window (default: 1000) * @param {integer} options.windowCount - Number of windows in health window (default: 10) * @param {float} options.errorThreshold - Error level between 0 and 1 at which the circuit is tripped (default: 0.05) * @param {object} options.errorNamesThresholds - Custom error levels for particular named errors (default: {}) * @param {boolean} options.emitIntervalEvent - Enable/disable emitting the 'interval' event for reporting (default: true) * @param {boolean} options.emitCallbackEvent - Enable/disable emitting the 'callback' event (default: false) */ function CircuitBreaker(fn, fnThis, options) { options = _.defaults(options || {}, defaults); var opts = _.extend({}, options), active = 0, queue = [], lastErrorMs = null, testing = false, rollingCounts = new RollingCounts(opts.windowCount), interval = makeInterval(), self = this, rollIntervalTimeoutObj, rollWindowTimeoutObj; function defer() { var resolve, reject; var promise = new opts.Promise(function(resolveFn, rejectFn) { resolve = resolveFn; reject = rejectFn; }); return { resolve: resolve, reject: reject, promise: promise }; } function makeInterval(now) { now = now || Date.now(); return _.extend(new Counts(), { start: now, times: [] }); } /** * Set the timeout in milliseconds. Set to 0 to disable. * * @param {integer} ms * @returns {CircuitBreaker} */ this.setTimeout = function(ms) { opts.timeout = ms; return self; }; /** * Set the minimum amount of calls to start checking for circuit health. * Circuit will always be closed until it reaches this threshold. * Set to 0 to instantly start checking for circuit health. * * @param {integer} count - Minimum amount of calls to start checking circuit health. * @returns {CircuitBreaker} */ this.setVolumeThreshold = function(count) { opts.volumeThreshold = count; return self; }; /** * Set the error threshold which tirggers circuit. * Set to 0 open circuit on first error. * * @param {float} level - Float from 0 to 1 * @returns {CircuitBreaker} */ this.setErrorThreshold = function(level) { opts.errorThreshold = level; return self; }; /** * Set the error threshold for a particular error type. * The error type is obtained from the name property of the error. * * @param {string} name - Error name * @param {float} level - Float from 0 to 1 */ this.setErrorNameThreshold = function(name, level) { opts.errorNamesThresholds[name] = level; return self; }; /** * Set the sleep period until a test request is allowed to be made. * The circuit will remain open during this period of time. * * @param {integer} ms * @returns {CircuitBreaker} */ this.setResetTime = function(ms) { opts.resetTime = ms; return self; }; /** * Set the concurrency level. Set to 0 to disable (default behaviour). * * Requests will be queded if more than this number are currently active. * * @param {integer} num - The maximum number of requests allowed to be active. * @returns {CircuitBreaker} */ this.setConcurrency = function(num) { opts.concurrency = num; return self; }; /** * Set the reporting interval size. * * @param {integer} ms - The frequency with which the 'interval' event is emitted. */ this.setIntervalSize = function(ms) { opts.intervalSize = ms; return self; }; /** * Set the size in milliseconds of each individual window used in the rolling counts. * * @param {integer} ms * @returns {CircuitBreaker} */ this.setWindowSize = function(ms) { opts.windowSize = ms; return self; }; /** * Set the total amount of windows to keep in the rolling counts. * * @param {integer} count * @returns {CircuitBreaker} */ this.setWindowCount = function(count) { rollingCounts.setSize(count); return self; }; /** * Enables/disables emitting the 'interval' event for reporting. * * @param {boolean} bool * @returns {CircuitBreaker} */ this.setEmitIntervalEvent = function(bool) { opts.emitIntervalEvent = bool ? true : false; return self; }; /** * Enables/disables emitting the 'callback' event for reporting. * * @param {boolean} bool * @returns {CircuitBreaker} */ this.setEmitCallbackEvent = function(bool) { opts.emitCallbackEvent = bool ? true : false; return self; }; /** * Set a custom function to check if the callback was called with an error or not. * * * @example * var cb = new RequestCircuitBreaker(); * cb.setIsErrorHandler(function(error, response, body) { * if (error) return error; * if (response.statusCode == 503) { * var unavailableError = new Error(); * unavailableError.name = "ServiceUnavailableError"; * return unavailableError; * } * return null; * }); * * @param {function} cb - A function which will be called with the callback arguments. * @returns {CircuitBreaker} */ this.setIsErrorHandler = function(cb) { opts.isErrorHandler = cb; return self; }; /** * Get the current reporting interval. * @returns {object} */ this.getCurrentInterval = function() { return _.cloneDeep(interval); }; /** * @deprecated * @returns {object} */ this.getCurrentInteval = function() { return _.cloneDeep(interval); }; /** * Get the current circuit breaker health rolling counts * * @returns {object} */ this.getCurrentCounts = function() { return _.cloneDeep(rollingCounts.getCurrent()); }; /** * Get the amount of current active requests. That is, * calls which have started but have not timedout nor called-back. * * @returns {integer} */ this.getActiveCount = function() { return active; }; /** * Get the amount of requests in queue when concurrency is enabled. * * @returns {integer} */ this.getQueuedCount = function() { return queue.length; }; /** * Get this instance's name * * @returns {string} */ this.getName = function() { return opts.name; }; function recordError(error) { interval.addError(error); rollingCounts.addError(error); lastErrorMs = Date.now(); testing = false; } function reset() { rollingCounts.reset(); lastErrorMs = null; testing = false; } /** * Get current error percentage. * * @returns {float} */ this.getErrorPercentage = function() { return rollingCounts.getCurrentErrorLevel(); }; /** * Get current circuit's [state]{@link CircuitBreaker.STATE}. * * @returns {string} */ this.getState = function() { if (opts.volumeThreshold && rollingCounts.getCurrent().total < opts.volumeThreshold) { return STATE.CLOSED; } var errorLevel = rollingCounts.getCurrentErrorLevel(); var isHalfOpen = !testing && opts.resetTime && lastErrorMs && Date.now() - lastErrorMs > opts.resetTime; if (errorLevel && errorLevel >= opts.errorThreshold) { if (isHalfOpen) { return STATE.HALF_OPEN; } return STATE.OPEN; } var namedErrorLevels = rollingCounts.getCurrentNamedErrorLevels(opts.errorNamesThresholds); for (var name in namedErrorLevels) { if (namedErrorLevels[name] && namedErrorLevels[name] >= opts.errorNamesThresholds[name]) { if (isHalfOpen) { return STATE.HALF_OPEN; } return STATE.OPEN; } } return STATE.CLOSED; }; this.toString = function() { return "[object CircuitBreaker " + name + " (" + self.getState() + ")]"; }; function rollInterval(firstTime) { var now = Date.now(); if (!firstTime && opts.emitIntervalEvent) { interval.end = now; interval.state = self.getState(); interval.active = active; interval.queued = queue.length; /** * Interval event * * @event CircuitBreaker#interval * @type object * @property {integer} start - Timestamp of start of the interval * @property {integer} end - Timestamp of the end of the interval * @property {string} state - Circuit's current state * @property {integer} active - Calls currently running * @property {integer} queued - Calls currently queued * @property {integer} total - Total calls * @property {integer} success - Total success calls * @property {integer} totalErrors - Total failed calls * @property {object} errors - Error counts by type * @property {integer} errors.TimeoutError - Total calls which timedout - if any * @property {integer} errors.OpenCircuitError - Total calls which were rejected (short-circuit) - if any * @property {array} times - Times of <b>successful</b> calls in milliseconds */ self.emit('interval', interval); } interval = makeInterval(now); rollIntervalTimeoutObj = setTimeout(rollInterval, opts.intervalSize); } function rollWindow(firstTime) { if (!firstTime) { rollingCounts.roll(); } rollWindowTimeoutObj = setTimeout(rollWindow, opts.windowSize); } /** * Start events. Events starts automatically upon the first request. * This forces start emitting events before any request is made. */ this.startEvents = function() { if (opts.intervalSize && !rollIntervalTimeoutObj) { rollInterval(true); } if (opts.windowSize && !rollWindowTimeoutObj) { rollWindow(true); } }; /** * Stop events. Clears the pending timeouts. */ this.stopEvents = function() { if (rollIntervalTimeoutObj) { clearTimeout(rollIntervalTimeoutObj); } if (rollWindowTimeoutObj) { clearTimeout(rollWindowTimeoutObj); } }; /** * Bluebird Promise module * @member {module} */ this.Promise = opts.Promise; /** * Main entry point. Call the protected function and return a promise. * * @returns {external:Promise} */ this.exec = function() { var args = Array.prototype.slice.call(arguments); self.startEvents(); if (opts.concurrency && active >= opts.concurrency) { return queueRequest(args); } else { return runRequest(args); } }; function queueRequest(args) { var resolver = defer(); queue.unshift({args: args, resolver: resolver}); return resolver.promise; } function runNextInQueue() { if (queue.length === 0) { return; } var queued = queue.pop(); process.nextTick(function() { runRequest(queued.args, queued.resolver); }); } function runRequest(callArgs, resolver) { resolver = resolver || defer(); var state = self.getState(); var timedout = false; var start = Date.now(); interval.total++; if (state == STATE.OPEN) { var error = new OpenCircuitError(); rollingCounts.addError(error); interval.addError(error); resolver.reject(error); return resolver.promise; } else if (state == STATE.HALF_OPEN) { testing = true; } active += 1; var timeoutObject = null; if (opts.timeout > 0) { timeoutObject = setTimeout(function() { timedout = true; var error = new TimeoutError("Timed out after " + opts.timeout + " ms"); active -= 1; recordError(error); resolver.reject(error); runNextInQueue(); }, opts.timeout); } var cb = function() { var args = Array.prototype.slice.call(arguments); if (timeoutObject) { clearTimeout(timeoutObject); } if (timedout) { return; } active -= 1; var now = Date.now(); var err = opts.isErrorHandler.apply(null, args); if (opts.emitCallbackEvent) { /** * Callback event. Emitted on calls which do not timeout. * * @event CircuitBreaker#callback * @type object * @property {array} args - Array or arguments in exec() call * @property {integer} start - Timestamp of the start of the call * @property {integer} end - Timestamp of the end of the call * @property {array} result - Array of arguments with which callback was called */ self.emit('callback', { args: callArgs.slice(0, -1), start: start, end: now, result: args }); } if (err) { recordError(err); resolver.reject(err); runNextInQueue(); return; } interval.success++; interval.times.push(now - start); rollingCounts.addSuccess(); if (testing) { reset(); } // remove error args.shift(); // if only one argument, resolve promise without array resolver.resolve(args.length == 1 ? args[0] : args); runNextInQueue(); }; callArgs.push(cb); // TODO fn THIS fn.apply(fnThis, callArgs); return resolver.promise; } } util.inherits(CircuitBreaker, EventEmitter); /** * State enum * @readonly * @enum {string} */ CircuitBreaker.STATE = { /** * No calls are allowed */ OPEN: 'open', /** * Calls are allowed */ CLOSED: 'closed', /** * One call is allowed to test health */ HALF_OPEN: 'half-open' }; var STATE = Object.freeze(CircuitBreaker.STATE); /** * Error returned when circuit is open. * @constructor * @extends external:Error * @param {string} message */ OpenCircuitError = function(message) { this.name = "OpenCircuitError"; this.message = message || "Circuit Breaker was tripped"; }; OpenCircuitError.prototype = new Error(); OpenCircuitError.prototype.constructor = OpenCircuitError; /** * Error returned when a call timesout * @constructor * @extends external:Error * @param {string} message */ TimeoutError = function(message) { this.name = "TimeoutError"; this.message = message || "Timed out"; }; TimeoutError.prototype = new Error(); TimeoutError.prototype.constructor = TimeoutError; /** * A circuit breaker which wraps the [request module]{@link https://www.npmjs.org/package/request}. * All options get passed to the CircuitBreaker (except the request option). * * @constructor * @extends CircuitBreaker * @param {object} options * @param {function} options.request - Request module (default: require('request')). */ function RequestCircuitBreaker(options) { options = options || {}; /** * @external request * @see https://github.com/mikeal/request */ var request = options.request || require('request'); delete options.request; CircuitBreaker.call(this, request, null, options); } util.inherits(RequestCircuitBreaker, CircuitBreaker); module.exports = CircuitBreaker; module.exports.RequestCircuitBreaker = RequestCircuitBreaker; module.exports.OpenCircuitError = OpenCircuitError; module.exports.TimeoutError = TimeoutError; module.exports.CBObserver = require('./observer'); module.exports.CBStats = require('./stats');