Source: circuitbreaker.js

var _ = require('lodash'),
    util = require('util'),
    Bluebird = require('bluebird'),
    EventEmitter = require('events').EventEmitter,
    Counts = require('./counts').Counts,
    RollingCounts = require('./counts').RollingCounts;

var defaults = {
    /**
     * Bluebird Promise
     * @constructor
     * @external Promise
     * @see {@link https://github.com/petkaantonov/bluebird/blob/master/API.md#core}
     */
    Promise: Bluebird,

    name: "CircuitBreaker",
    isErrorHandler: isError,
    concurrency: 0,
    timeout: 3000,
    resetTime: 1000,
    volumeThreshold: 10,
    intervalSize: 1000,
    windowSize: 1000,
    windowCount: 10,
    errorThreshold: 0.05,
    errorNamesThresholds: {},
    emitIntervalEvent: true,
    emitCallbackEvent: false
};

function isError(error) {
    return error;
}

/**
 * Create a new circuit breaker wrapping function *fn*.  If the function
 * is a method of an object, pass the object as the *fnThis* parameter.
 *
 * You can configure it's behaviour with the *options* parameter, or through
 * various setters:
 *
 * @example
 * var cb = new CircuitBreaker(fn)
 *          .setTimeout(1000)
 *          .setErrorThreshold(0.5);
 *
 * @fires CircuitBreaker#interval
 * @fires CircuitBreaker#callback
 *
 * @constructor
 * @param {function} fn - The function to be executed
 * @param {object=} fnThis - If provided, call fn with fnThis as this argument
 * @param {object=} options - Options. All times are in milliseconds.
 * @param {function} options.Promise - Promise library (default: require('bluebird') / you can use the navite Promise constructor)
 * @param {string} options.name - Name for this instance (default: 'CircuitBreaker')
 * @param {function} options.isErrorHandler - Function to get error from callback
 * @param {integer} options.concurrency - Set a limit for concurrent calls (default: 0 - unlimited)
 * @param {integer} options.timeout - Timeout limit for callback to be called (default: 3000)
 * @param {integer} options.resetTime - When circuit is tripped, time to wait before a test call is allowed (default: 1000)
 * @param {integer} options.volumeThreshold - Minimum number of calls in health window before circuit is checked
 *                                            for health. Circuit remains <b>closed</b> until then (default: 10)
 * @param {integer} options.intervalSize - Frequency with which the 'interval' event is emitted for reporting (default: 1000)
 * @param {integer} options.windowSize - Time size of each individual rolling window (default: 1000)
 * @param {integer} options.windowCount - Number of windows in health window (default: 10)
 * @param {float} options.errorThreshold - Error level between 0 and 1 at which the circuit is tripped (default: 0.05)
 * @param {object} options.errorNamesThresholds - Custom error levels for particular named errors (default: {})
 * @param {boolean} options.emitIntervalEvent - Enable/disable emitting the 'interval' event for reporting (default: true)
 * @param {boolean} options.emitCallbackEvent - Enable/disable emitting the 'callback' event (default: false)
 */
function CircuitBreaker(fn, fnThis, options) {
    options = _.defaults(options || {}, defaults);

    var opts = _.extend({}, options),
        active = 0,
        queue = [],
        lastErrorMs = null,
        testing = false,
        rollingCounts = new RollingCounts(opts.windowCount),
        interval = makeInterval(),
        self = this,
        rollIntervalTimeoutObj,
        rollWindowTimeoutObj;

    function defer() {
      var resolve, reject;
      var promise = new opts.Promise(function(resolveFn, rejectFn) {
        resolve = resolveFn;
        reject = rejectFn;
      });
      return {
        resolve: resolve,
        reject: reject,
        promise: promise
      };
    }

    function makeInterval(now) {
        now = now || Date.now();
        return _.extend(new Counts(), {
            start: now,
            times: []
        });
    }

    /**
     * Set the timeout in milliseconds. Set to 0 to disable.
     *
     * @param {integer} ms
     * @returns {CircuitBreaker}
     */
    this.setTimeout = function(ms) {
        opts.timeout = ms;
        return self;
    };

    /**
     * Set the minimum amount of calls to start checking for circuit health.
     * Circuit will always be closed until it reaches this threshold.
     * Set to 0 to instantly start checking for circuit health.
     *
     * @param {integer} count - Minimum amount of calls to start checking circuit health.
     * @returns {CircuitBreaker}
     */
    this.setVolumeThreshold = function(count) {
        opts.volumeThreshold = count;
        return self;
    };

    /**
     * Set the error threshold which tirggers circuit.
     * Set to 0 open circuit on first error.
     *
     * @param {float} level - Float from 0 to 1
     * @returns {CircuitBreaker}
     */
    this.setErrorThreshold = function(level) {
        opts.errorThreshold = level;
        return self;
    };

    /**
     * Set the error threshold for a particular error type.
     * The error type is obtained from the name property of the error.
     *
     * @param {string} name - Error name
     * @param {float} level - Float from 0 to 1
     */
    this.setErrorNameThreshold = function(name, level) {
        opts.errorNamesThresholds[name] = level;
        return self;
    };

    /**
     * Set the sleep period until a test request is allowed to be made.
     * The circuit will remain open during this period of time.
     *
     * @param {integer} ms
     * @returns {CircuitBreaker}
     */
    this.setResetTime = function(ms) {
        opts.resetTime = ms;
        return self;
    };

    /**
     * Set the concurrency level. Set to 0 to disable (default behaviour).
     *
     * Requests will be queded if more than this number are currently active.
     *
     * @param {integer} num - The maximum number of requests allowed to be active.
     * @returns {CircuitBreaker}
     */
    this.setConcurrency = function(num) {
        opts.concurrency = num;
        return self;
    };

    /**
     * Set the reporting interval size.
     *
     * @param {integer} ms - The frequency with which the 'interval' event is emitted.
     */
    this.setIntervalSize = function(ms) {
        opts.intervalSize = ms;
        return self;
    };

    /**
     * Set the size in milliseconds of each individual window used in the rolling counts.
     *
     * @param {integer} ms
     * @returns {CircuitBreaker}
     */
    this.setWindowSize = function(ms) {
        opts.windowSize = ms;
        return self;
    };

    /**
     * Set the total amount of windows to keep in the rolling counts.
     *
     * @param {integer} count
     * @returns {CircuitBreaker}
     */
    this.setWindowCount = function(count) {
        rollingCounts.setSize(count);
        return self;
    };

    /**
     * Enables/disables emitting the 'interval' event for reporting.
     *
     * @param {boolean} bool
     * @returns {CircuitBreaker}
     */
    this.setEmitIntervalEvent = function(bool) {
        opts.emitIntervalEvent = bool ? true : false;
        return self;
    };

    /**
     * Enables/disables emitting the 'callback' event for reporting.
     *
     * @param {boolean} bool
     * @returns {CircuitBreaker}
     */
    this.setEmitCallbackEvent = function(bool) {
        opts.emitCallbackEvent = bool ? true : false;
        return self;
    };

    /**
     * Set a custom function to check if the callback was called with an error or not.
     *
     *
     * @example
     * var cb = new RequestCircuitBreaker();
     * cb.setIsErrorHandler(function(error, response, body) {
     *     if (error) return error;
     *     if (response.statusCode == 503) {
     *         var unavailableError = new Error();
     *         unavailableError.name = "ServiceUnavailableError";
     *         return unavailableError;
     *     }
     *     return null;
     * });
     *
     * @param {function} cb - A function which will be called with the callback arguments.
     * @returns {CircuitBreaker}
     */
    this.setIsErrorHandler = function(cb) {
        opts.isErrorHandler = cb;
        return self;
    };

    /**
     * Get the current reporting interval.

     * @returns {object}
     */
    this.getCurrentInterval = function() {
        return _.cloneDeep(interval);
    };

    /**
     * @deprecated
     * @returns {object}
     */
    this.getCurrentInteval = function() {
        return _.cloneDeep(interval);
    };

    /**
     * Get the current circuit breaker health rolling counts
     *
     * @returns {object}
     */
    this.getCurrentCounts = function() {
        return _.cloneDeep(rollingCounts.getCurrent());
    };

    /**
     * Get the amount of current active requests. That is,
     * calls which have started but have not timedout nor called-back.
     *
     * @returns {integer}
     */
    this.getActiveCount = function() {
        return active;
    };

    /**
     * Get the amount of requests in queue when concurrency is enabled.
     *
     * @returns {integer}
     */
    this.getQueuedCount = function() {
        return queue.length;
    };

    /**
     * Get this instance's name
     *
     * @returns {string}
     */
    this.getName = function() {
        return opts.name;
    };

    function recordError(error) {
        interval.addError(error);
        rollingCounts.addError(error);
        lastErrorMs = Date.now();
        testing = false;
    }

    function reset() {
        rollingCounts.reset();
        lastErrorMs = null;
        testing = false;
    }

    /**
     * Get current error percentage.
     *
     * @returns {float}
     */
    this.getErrorPercentage = function() {
        return rollingCounts.getCurrentErrorLevel();
    };

    /**
     * Get current circuit's [state]{@link CircuitBreaker.STATE}.
     *
     * @returns {string}
     */
    this.getState = function() {
        if (opts.volumeThreshold && rollingCounts.getCurrent().total < opts.volumeThreshold) {
            return STATE.CLOSED;
        }
        var errorLevel = rollingCounts.getCurrentErrorLevel();
        var isHalfOpen = !testing && opts.resetTime && lastErrorMs && Date.now() - lastErrorMs > opts.resetTime;
        if (errorLevel && errorLevel >= opts.errorThreshold) {
            if (isHalfOpen) {
                return STATE.HALF_OPEN;
            }
            return STATE.OPEN;
        }
        var namedErrorLevels = rollingCounts.getCurrentNamedErrorLevels(opts.errorNamesThresholds);
        for (var name in namedErrorLevels) {
            if (namedErrorLevels[name] && namedErrorLevels[name] >= opts.errorNamesThresholds[name]) {
                if (isHalfOpen) {
                    return STATE.HALF_OPEN;
                }
                return STATE.OPEN;
            }
        }
        return STATE.CLOSED;
    };

    this.toString = function() {
        return "[object CircuitBreaker " + name + " (" + self.getState() + ")]";
    };

    function rollInterval(firstTime) {
        var now = Date.now();
        if (!firstTime && opts.emitIntervalEvent) {
            interval.end = now;
            interval.state = self.getState();
            interval.active = active;
            interval.queued = queue.length;

            /**
             * Interval event
             *
             * @event CircuitBreaker#interval
             * @type object
             * @property {integer} start - Timestamp of start of the interval
             * @property {integer} end - Timestamp of the end of the interval
             * @property {string} state - Circuit's current state
             * @property {integer} active - Calls currently running
             * @property {integer} queued - Calls currently queued
             * @property {integer} total - Total calls
             * @property {integer} success - Total success calls
             * @property {integer} totalErrors - Total failed calls
             * @property {object} errors - Error counts by type
             * @property {integer} errors.TimeoutError - Total calls which timedout - if any
             * @property {integer} errors.OpenCircuitError - Total calls which were rejected (short-circuit) - if any
             * @property {array} times - Times of <b>successful</b> calls in milliseconds
             */
            self.emit('interval', interval);
        }
        interval = makeInterval(now);
        rollIntervalTimeoutObj = setTimeout(rollInterval, opts.intervalSize);
    }

    function rollWindow(firstTime) {
        if (!firstTime) {
            rollingCounts.roll();
        }
        rollWindowTimeoutObj = setTimeout(rollWindow, opts.windowSize);
    }

    /**
     * Start events. Events starts automatically upon the first request.
     * This forces start emitting events before any request is made.
     */
    this.startEvents = function() {
        if (opts.intervalSize && !rollIntervalTimeoutObj) {
            rollInterval(true);
        }

        if (opts.windowSize && !rollWindowTimeoutObj) {
            rollWindow(true);
        }
    };

    /**
     * Stop events. Clears the pending timeouts.
     */
    this.stopEvents = function() {
        if (rollIntervalTimeoutObj) {
            clearTimeout(rollIntervalTimeoutObj);
        }
        if (rollWindowTimeoutObj) {
            clearTimeout(rollWindowTimeoutObj);
        }
    };

    /**
     * Bluebird Promise module
     * @member {module}
     */
    this.Promise = opts.Promise;

    /**
     * Main entry point. Call the protected function and return a promise.
     *
     * @returns {external:Promise}
     */
    this.exec = function() {
        var args = Array.prototype.slice.call(arguments);

        self.startEvents();

        if (opts.concurrency && active >= opts.concurrency) {
            return queueRequest(args);
        } else {
            return runRequest(args);
        }
    };

    function queueRequest(args) {
        var resolver = defer();
        queue.unshift({args: args, resolver: resolver});
        return resolver.promise;
    }

    function runNextInQueue() {
        if (queue.length === 0) {
            return;
        }
        var queued = queue.pop();
        process.nextTick(function() {
            runRequest(queued.args, queued.resolver);
        });
    }

    function runRequest(callArgs, resolver) {

        resolver = resolver || defer();
        var state = self.getState();
        var timedout = false;
        var start = Date.now();

        interval.total++;

        if (state == STATE.OPEN) {
            var error = new OpenCircuitError();
            rollingCounts.addError(error);
            interval.addError(error);
            resolver.reject(error);
            return resolver.promise;
        } else if (state == STATE.HALF_OPEN) {
            testing = true;
        }

        active += 1;

        var timeoutObject = null;
        if (opts.timeout > 0) {
            timeoutObject = setTimeout(function() {
                timedout = true;
                var error = new TimeoutError("Timed out after " + opts.timeout + " ms");
                active -= 1;
                recordError(error);
                resolver.reject(error);
                runNextInQueue();
            }, opts.timeout);
        }

        var cb = function() {
            var args = Array.prototype.slice.call(arguments);

            if (timeoutObject) {
                clearTimeout(timeoutObject);
            }

            if (timedout) {
                return;
            }

            active -= 1;
            var now = Date.now();

            var err = opts.isErrorHandler.apply(null, args);

            if (opts.emitCallbackEvent) {
                /**
                 * Callback event. Emitted on calls which do not timeout.
                 *
                 * @event CircuitBreaker#callback
                 * @type object
                 * @property {array} args - Array or arguments in exec() call
                 * @property {integer} start - Timestamp of the start of the call
                 * @property {integer} end - Timestamp of the end of the call
                 * @property {array} result - Array of arguments with which callback was called
                 */
                self.emit('callback', {
                    args: callArgs.slice(0, -1),
                    start: start,
                    end: now,
                    result: args
                });
            }

            if (err) {
                recordError(err);
                resolver.reject(err);
                runNextInQueue();
                return;
            }

            interval.success++;
            interval.times.push(now - start);
            rollingCounts.addSuccess();

            if (testing) {
                reset();
            }

            // remove error
            args.shift();
            // if only one argument, resolve promise without array
            resolver.resolve(args.length == 1 ? args[0] : args);
            runNextInQueue();
        };

        callArgs.push(cb);
        // TODO fn THIS
        fn.apply(fnThis, callArgs);

        return resolver.promise;
    }

}
util.inherits(CircuitBreaker, EventEmitter);

/**
 * State enum
 * @readonly
 * @enum {string}
 */
CircuitBreaker.STATE = {
    /**
     * No calls are allowed
     */
    OPEN: 'open',

    /**
     * Calls are allowed
     */
    CLOSED: 'closed',

    /**
     * One call is allowed to test health
     */
    HALF_OPEN: 'half-open'
};
var STATE = Object.freeze(CircuitBreaker.STATE);

/**
 * Error returned when circuit is open.
 * @constructor
 * @extends external:Error
 * @param {string} message
 */
OpenCircuitError = function(message) {
    this.name = "OpenCircuitError";
    this.message = message || "Circuit Breaker was tripped";
};
OpenCircuitError.prototype = new Error();
OpenCircuitError.prototype.constructor = OpenCircuitError;

/**
 * Error returned when a call timesout
 * @constructor
 * @extends external:Error
 * @param {string} message
 */
TimeoutError = function(message) {
    this.name = "TimeoutError";
    this.message = message || "Timed out";
};
TimeoutError.prototype = new Error();
TimeoutError.prototype.constructor = TimeoutError;


/**
 * A circuit breaker which wraps the [request module]{@link https://www.npmjs.org/package/request}.
 * All options get passed to the CircuitBreaker (except the request option).
 *
 * @constructor
 * @extends CircuitBreaker
 * @param {object} options
 * @param {function} options.request - Request module (default: require('request')).
 */
function RequestCircuitBreaker(options) {
    options = options || {};
    /**
     * @external request
     * @see https://github.com/mikeal/request
     */
    var request = options.request || require('request');
    delete options.request;
    CircuitBreaker.call(this, request, null, options);
}
util.inherits(RequestCircuitBreaker, CircuitBreaker);


module.exports = CircuitBreaker;
module.exports.RequestCircuitBreaker = RequestCircuitBreaker;
module.exports.OpenCircuitError = OpenCircuitError;
module.exports.TimeoutError = TimeoutError;
module.exports.CBObserver = require('./observer');
module.exports.CBStats = require('./stats');