Funktionale Reaktive Programmierung mit Bacon.js

angularjs reactive observables | Ramy Hardan | 20 Minuten

Dieser Artikel soll Euch Funktionale Reaktive Programmierung näher bringen und in Verbindung mit AngularJS schmackhaft machen, damit Ihr komplexe Interaktionen mit einfachem, kompaktem Code umsetzen könnt.

Was ist Funktionale Reaktive Programmierung?

Wer schon Formeln in Excel verwendet hat oder Databinding in AngularJS nutzt, kam bereits in den Genuss reaktiver Programmierung. Wir brauchen uns nicht selbst um die Aktualisierung des Zustands bei Änderungen zu kümmern, sondern überlassen diese Aufgabe der Plattform. Voraussetzung ist, dass wir ihr vorher die Abhängigkeiten zwischen den Daten beibringen.

In AngularJS definieren wir Abhängigkeiten zwischen Scope (Model) und DOM (View) meist über Direktiven, z.B. ngModel:

<input ng-model="username" type="text">

Damit haben wir AngularJS beigebracht, das Eingabefeld zu beobachten und alle Änderungen an den Scope weiterzuleiten und dort den username entsprechend aufzufrischen (ebenso in umgekehrter Richtung). Das Entscheidende dabei ist, dass es sich nicht um einen One-Night-Stand handelt, sondern um eine dauerhafte Beziehung: Jeder beobachtet den anderen und reagiert, wenn sich etwas beim Partner ändert.

Hier sind wir schon im Kern reaktiver Programmierung: Sie konzentriert sich auf Zustandsänderungen über die Zeit hinweg und nicht nur auf den aktuellen Zustand.

Etwas abstrakter ließe sich das obige Beispiel so schreiben:

scope.username = dom.input

Im nicht-reaktiven Zusammenhang wäre dies eine einfache Zuweisung: “Weise der Variablen scope.username einmalig den aktuellen Wert der Variablen dom.input zu.”

Bei reaktiver Programmierung bedeutet der Ausdruck: “Beobachte ständig jede Änderung in dom.input und leite sie an scope.username weiter.” Wegen dieser Eigenschaft hat sich die Bezeichnung Observable für solche dynamischen Datentypen durchgesetzt. Das Observable ist der funktionale Bruder des objektorientierten Observer-Entwurfsmusters.

Die Werte eines Observable sind also nie einfach, sondern immer Ströme von Werten über die Zeit. Wenn ich z.B. Ramy in das Eingabefeld tippe, dann könnte dom.input den Wert ...'R'....'a'..'m'...'y'.. haben, je nachdem, wann ich die Tasten drücke. Die Punkte entsprechen jeweils einer Zeiteinheit.

  Synchron Asynchron
Ein Wert Bild
Datentyp: z.B. Number
Bild
Datentyp: Promise/Future
Viele Werte Bild
Datentyp: Array/Object
Bild
Datentyp: Observable
     

Das war der reaktive Teil, es fehlt noch der funktionale. Auf den Ereignisströmen lassen sich Funktionen definieren, deren Ergebnisse wiederum Ereignisströme sind. So können wir Ströme kombinieren und zu einer “Kanalisation” ausbauen, um komplexes Verhalten abzubilden.

Wie das konkret in JavaScript aussieht, schauen wir uns am Beispiel von Bacon.js an.

Was ist Bacon.js?

Bacon.js ermöglicht Funktionale Reaktive Programmierung in JavaScript. Beginnen wir mit einem Beispiel:

var tickStream = Bacon.interval(500, 1);

Der Wert von tickStream ist ein EventStream, in dem alle 500 Millisekunden eine 1 erscheint.

....1....1....1....

Bacon.js liefert viele Funktionen, die Komposition von Strömen ermöglichen. Aus dem tickStream wollen wir nun einen neuen Strom erzeugen, der abwechselnd 0 und 1 enthält, statt nur 1:

var tickStream = Bacon.interval(500, 1);
var aggregator = function(aggregate, tick) {
  return aggregate !== tick ? 1 : 0
}
var ticktackStream = tickStream.scan(0, aggregator);

Die Funktion scan aggregiert Werte eines Stroms. Dazu übergibt man ihr einen Startwert und eine Aggregierungsfunktion. Die Aggregierungsfunktion erhält als Argumente das Ergebnis ihres letzten Aufrufs (bzw. den Startwert beim ersten Aufruf) und den aktuellen Wert des Stroms. In unserem Beispiel sehen die Aufrufe von aggregator so aus:

(0, 1) -> 1  // Startwert von scan,         1. Wert aus tickStream
(1, 1) -> 0  // 1. Ergebnis von aggregator, 2. Wert aus tickStream
(0, 1) -> 1  // 2. Ergebnis von aggregator, 3. Wert aus tickStream

usw.

Unsere beiden Ströme enthalten also folgende Werte:

    tickStream:      ....1....1....1....
    ticktackStream: 0....1....0....1....

Das Ergebnis von scan ist streng genommen kein EventStream, sondern eine Property. Der Unterschied ist, dass eine Property einen aktuellen Wert (und ggf. einen Startwert) besitzt. Sie entspricht dem Konzept der überwachten Properties im Scope und ermöglicht die leichte Integration mit AngularJS, wie wir später sehen werden.

Da ticktackStream einen aktuellen Wert hat, benennen wir ihn um und korrigieren unser Schaubild:

    tickStream:    ....1....1....1....
    ticktackProp: 00000111110000011111

EventStream und Property sind lediglich unterschiedliche Sichtweisen auf veränderliche Werte. Anfangs verwirrt diese Unterscheidung eher, die jeweiligen Einsatzzwecke werden aber mit der Zeit klarer. Mit eventStream.toProperty() bzw. property.changes() können wir beruhigenderweise immer eine Sicht gegen die andere tauschen.

Im nächsten Abschnitt verheiraten wir Bacon.js mit AngularJS und erwecken ein lange und schmerzlich vermisstes HTML-Element wieder zum Leben.

Wie bleiben wir mit AngularJS im Fluss?

Glücklicherweise ebnet uns das Modul angular-bacon einen bequemen Weg, AngularJS und Bacon.js miteinander zu verbinden:

var baconProperty = $scope.$watchAsProperty('scopePropertyName') // AngularJS -> Bacon.js
baconPropertyOrStream.digest($scope, 'scopePropertyName')        // Bacon.js -> AngularJS

So können wir z.B. in unseren AngularJS-Controllern Properties aus dem Scope als Bacon.js-Properties nutzen und reaktiv weiterprogrammieren. Betten wir unseren bisherigen Bacon.js-Code also in einen Controller ein und binden den aktuellen Wert von ticktackProp an eine Scope-Property ticktack:

angular.module( 'baconDemo', ['angular-bacon'] ).controller( 'baconCtrl', function($scope) {
    var tickStream = Bacon.interval(500, 1);
    var aggregator = function(aggregate, tick) {
      return aggregate !== tick ? 1 : 0
    };
    var ticktackProp = tickStream.scan(0, aggregator);

    ticktackProp.digest($scope, 'ticktack');
});

Da ticktack jetzt im Scope liegt, können wir sie wie gewohnt in AngularJS nutzen:

<p ng-controller="baconCtrl">
  Ticktack: {{ticktack}}
</p>

Sogar Google vermisst <blink>. Erwecken wir es also wieder zum Leben. Dazu erstellen wir eine Direktive blink, deren Sichtbarkeit wir über ng-style mit Hilfe unserer ticktackProp wechseln.

Im ersten Schritt erzeugen wir aus unserer ticktackProp mundgerechte CSS-Happen für ng-style und binden sie an den Scope:

var toVisibility = function(ticktackVal) {
  return ticktackVal ? {visibility:'visible'} : {visibility: 'hidden'};
};
var blinkProp = ticktackProp.map(toVisibility);

blinkProp.digest($scope, 'blinkCss');

Die von map erzeugte neue Bacon.js-Property blinkProp enthält alle mit der Funktion toVisibility abgebildeten Werte aus ticktackProp:

ticktackProp: 00000111110000011111
blinkProp:    hhhhhvvvvvhhhhhvvvvv

Im Scope haben wir nun eine Property blinkCss mit CSS-Daten, die wir für unsere Direktive nutzen:

angular.module('baconDemo').directive( 'blink', function() {
  return {
    restrict: 'E',
    transclude: true,
    template: '<div ng-style="blinkCss" ng-transclude></div>',
  }
});

Jetzt steht uns nichts mehr im Weg auf unserer Reise zu den Ursprüngen der Webgestaltung:

<p ng-controller="baconCtrl">
  <blink>This should blink</blink>
  This shouldn't
  <blink>This should blink, too</blink>
</p>

Plunkr in neuem Fenster öffnen

Es nervt auf Dauer, bitte ausschalten!

Geben wir nun dem Nutzer die Möglichkeit, das Gezappel auf dem Bildschirm mit einem Button ab- und anzuschalten. Dazu definieren wir einen Button:

<p ng-controller="baconCtrl">
  <button ng-click="shouldBlink = !shouldBlink">Blink: {{shouldBlink}}</button>
  <blink>This should blink</blink>
  This shouldn't
  <blink>This should blink, too</blink>
</p>

Ich habe mich hier für eine schmutzige Abkürzung entschieden und die Button-Logik gleich im ng-click-Attribut implementiert. Der Button setzt also beim Klicken shouldBlink im Scope abwechselnd auf true oder false.

Um das Blinken zu stoppen, setzen wir im ersten Versuch gleich an der Wurzel an und filtern den tickStream, damit er nur tickt, wenn shouldBlink den Wert true hat.

.controller( 'baconCtrl', function($scope) {
  $scope.shouldBlink = true;
  shouldBlinkProp = $scope.$watchAsProperty('shouldBlink');
  var tickStream = Bacon.interval(500, 1).filter(shouldBlinkProp);
  // Der Rest bleibt unverändert
}

Mit filter erlaubt uns Bacon.js, nur bestimmte Werte in einem Strom durchzulassen. Neben einer Filterfunktion können wir auch, wie hier, eine Property übergeben. Ist sie true, dürfen die Werte des Stroms passieren, sonst nicht:

Bacon.interval:  ....1....1....1....1....1....1
shouldBlinkProp: tttttttttttffffffffffftttttttt
tickStream:      ....1....1..............1....1

Wer mit dem Button etwas herumspielt, wird einen Fehler entdecken. Schalte ich das Blinken aus, während die blinkenden Elemente gerade nicht sichtbar sind, bleiben sie dauerhaft verborgen. Damit die blinkenden Elemente bei ausgeschaltetem Blinken immer sichtbar sind, haken wir uns an anderer Stelle im Strom ein:

var tickTackWithSwitch = function(latestTicktack, shouldBlinkCurrently) {
  return shouldBlinkCurrently ? latestTicktack : 1;
}
var blinkProp = ticktackProp
                .combine(shouldBlinkProp, tickTackWithSwitch)
                .map(toVisibility);

Mit combine erzeugen wir aus 2 Strömen einen neuen. Der neue Strom entsteht durch verschmelzen der jeweils letzten Werte jedes Stroms, die wir an eine entsprechende Kombinierungsfunktion verfüttern (tickTackWithSwitch). In unserem Beispiel kombinieren wir ticktackProp mit shouldBlinkProp, um bei ausgeschaltetem Blinken immer eine 1 im Ergebnisstrom zu haben:

    ticktackProp:    000001111100000111110000011111
    shouldBlinkProp: tttttttttttffffffffffftttttttt
    combined:        000001111101111111111100011111

Wir sollten nun ein Gefühl für die grundlegenden Funktionen (filter, map, scan, combine) eines Observable haben. Auch haben wir gesehen, wie wir das Verhalten durch Zusammenstecken von Strömen leicht ändern und erweitern können. Als nächstes zeige ich Code aus einem unserer echten Projekte.

Ein Beispiel aus der Praxis

In einem unserer neuesten Projekte geben wir Node.js die Gelegenheit, sich als Backend zu bewähren, und haben einen Circuit Breaker mit Bacon.js umgesetzt. Ähnlich einer elektrischen Sicherung, die bei einem überlasteten Stromkreis das gesamte Haus vor dem Abfackeln schützt, kappt der Circuit Breaker eine überlastete Verbindung zu einem entfernten System, damit nicht zu viele Anfragen auflaufen und unnötig Ressourcen fressen. Fehler in einer Komponente werden so früh signalisiert (Fail Fast und pflanzen sich nicht fort.

Ein Circuit Breaker kennt 3 Zustände

  1. Im geschlossenen Zustand lässt er alle Anfragen an das entfernte System passieren. Sollten zu viele Anfragen in Folge scheitern (durch Zeitüberschreitung), wechselt er in den offenen Zustand.

  2. Ist der Circuit Breaker offen, lehnt er alle Anfragen sofort ab und übermittelt nichts an das entfernte System. Nach einer Abkühlperiode landet er im halboffenen Zustand.

  3. Hier taucht der Circuit Breaker den großen Zeh ins Wasser und lässt genau eine Anfrage durch. Scheitert sie, kehrt er in den offenen Zustand zurück und die Warteperiode beginnt von neuem. Ist sie erfolgreich, schließt er den Kreis wieder.

image alt text

Wir steuern unseren Circuit Breaker über 3 Busse sendQueue, successes und failures. Ein Bus ist ein EventStream, der zusätzlich eine push-Methode anbietet, um Werte in den Strom zu schieben. Um den Circuit Breaker in AngularJS zu nutzen, habe ich ihn als $http-Interceptor eingebunden:

angular.module( 'circuitBreakerDemo', [] )
// factory für circuitBreaker zunächst ausgelassen

.factory('circuitBreakerInterceptor', function(circuitBreaker, $q, $log) {

  return {
    request: function (config) {
      var deferred = $q.defer();
      circuitBreaker.sendQueue.push({deferred: deferred, config: config});
      return deferred.promise;
    },

    response: function (response) {
      $log.info("CB", "Response", response);
      circuitBreaker.successes.push(1);
      return response;
    },

    responseError: function (rejection) {
      $log.error("CB", "ResponseError", rejection);
      circuitBreaker.failures.push(1);
      return $q.reject(rejection);
    }
  };
})

.config(['$httpProvider', function($httpProvider) {
    $httpProvider.interceptors.push('circuitBreakerInterceptor');
}]);

Die Funktionen response und responseError des Interceptors sind schnell erklärt: Wir signalisieren dem Circuit Breaker den Erfolg bzw. Fehlschlag einer Anfrage, indem wir eine 1 in den entsprechenden Bus schieben (successes bzw. failures).

Die request-Funktion ist nur wenig anspruchsvoller: Mit $q erzeugen wir zunächst ein neues deferred-Objekt, packen es mit der Anfrage (config) in ein Container-Objekt und schieben dies in die sendQueue. Dann geben wir das zu deferred gehörige promise an AngularJS zurück. Der Circuit Breaker kann nun abhängig von seinem Zustand die Anfrage erlauben (mit deferred.resolve(config)) oder ablehnen (mit deferred.reject(config)) und AngularJS erfährt von der Entscheidung über das zugehörige promise.

Den Code des Circuit Breakers will ich nicht in jedem Detail erklären, weil er überwiegend aus den alten Bekannten filter, map und scan besteht. Der folgende Plunk demonstriert die Funktionsweise des Circuit Breakers und enthält den vollständigen Code.

Plunkr in neuem Fenster öffnen

Ich beleuchte deshalb nur die Stellen, an denen wir den neuen Funktionen throttle, merge und slidingWindow begegnen.

Die Warteperiode, nach der wir vom offenen Zustand in den halboffenen wechseln, können wir leicht mit Bacon.js implementieren:

var retryPing = failures
                .filter(tripped)
                .throttle(cbConfig.waitUntilRetry)

Die Property tripped, die wir an anderer Stelle berechnen, zeigt uns an, ob die Zahl der aufeinanderfolgenden Fehler einen konfigurierbaren Grenzwert erreicht hat. throttle schläft für eine bestimmte Zeit, gibt den letzten Wert des Eingabestroms aus und legt sich wieder hin.

    failures: ..1...1.1..1..........
    tripped:  fffffffftttttttttttttt
    filter:   ........1..1..........
    throttle: .....................1
                      |___Schlaf___|

Der folgende Codeblock entscheidet, ob wir vom halboffenen Zustand in den geschlossenen wechseln:

var halfOpenReceive = retries
                        .map(2)
                        .merge(successes.map(1))
                        .merge(failures.map(3))
                        .slidingWindow(2,2);

var halfOpenToClosedReceive = halfOpenReceive.filter(function(v) {
  return v[0] == 2 && v[1] == 1;
});

halfOpenToClosedReceive.onValue(function() {
  $log.info("CLOSED due to successful retry.");
  recloses.push(1);
});

Der Strom retries enthält für jeden Verbindungsversuch im halboffenen Zustand eine 1. Wir führen dann retries, successes und failures mittels merge zusammen. Das Ergebnis ist ein Strom, in den alle Werte der zusammengeführten Ströme fließen. Da jeder dieser Ströme nur Einsen liefert, bilden wir sie zur Unterscheidung vorher auf verschiedene Werte ab.

retries.map(2):   ..2......2...
successes.map(1): ...........1.
merge:            ..2......2.1.

Um herauszufinden, ob ein Verbindungsversuch erfolgreich war, betrachten wir die letzten beiden Werte des zusammengeführten Stroms und schließen den Kreis wieder, wenn auf einen Verbindungsversuch ein Erfolg folgt. Dazu picken wir mit slidingWindow(2,2)mind. 2 und max. 2, also genau 2 Werte aus dem Strom und schieben sie als Array in den Ausgabestrom.

halfOpenReceive:         [2,3][2,3][2,1]
halfOpenToClosedReceive: .    .    [2,1]

Warum so umständlich? Was habe ich davon?

Meine Erfahrung ist, dass ich mit FRP kompakteren und robusteren Code schreibe, um komplexes Verhalten abzubilden. Zusätzlich treten bei Änderungen/Erweiterungen seltener unerwünschte Seiteneffekte auf.

Ich führe das auf folgende Eigenschaften von FRP zurück:

  • Inversion of Control: Unser Code muss sich nicht um den Kontrollfluss kümmern. Alle hier gezeigten Beispiele kommen ohne Schleifen und if-Anweisungen aus (McCabe würde sich freuen). Mit Dependency Injection begegnet uns eine Anwendung dieses Prinzips bereits bei AngularJS.

  • Vielseitigkeit: Wir programmieren für den allgemeinen Fall, d.h. uns ist egal, ob wir einen Wert oder viele Werte, synchron oder asynchron verarbeiten. Wie unsere Beispiele zeigen, arbeitet der Großteil unseres Codes mit nur ein oder zwei Werten, die er als Funktionsparameter erhält, und bleibt so sehr einfach.

  • Komponierbarkeit: FRP kommt dem Lego-Ideal recht nahe. Observables können wir beliebig zusammenstecken, zerlegen, und neu zusammensetzen. Die Selbstähnlichkeit und Wiederverwendbarkeit unseres Codes steigt, da Komponenten nur über Ein- und Ausgabeströme ohne gemeinsamen Zustand miteinander gekoppelt sind, egal, wie tief wir zoomen.

image alt text

Was sollte ich bei der Entwicklung mit FRP/Bacon.js beachten?

Zum Abschluss gebe ich einige Empfehlungen, die helfen sollen, Fallstricke zu umgehen.

  • Werte nicht ändern! Für uns hat sich bewährt, die Werte in den Strömen als unveränderlich zu behandeln. Sonst riskieren wir schwer nachvollziehbare Seiteneffekte.

  • Zeitabhängige Operationen kapseln! Testen von asynchronem Verhalten ist ein Graus und Konstrukte wie Jasmines Clock sind bestenfalls Krücken. Wir lagern asynchrone Operationen meist gemäß dem Strategie-Muster aus.

Beim Circuit Breaker würden wir die oben vorgestellte throttle-Operation wie folgt kapseln:

var throttler = throttleStrategy || function(observable) {
  return observable.throttle(cbConfig.waitUntilRetry);
}

var retryPing = throttler(failures.filter(tripped));

Übergeben wir keine throttleStrategy, verhält sich die throttler-Funktion einfach wie throttle aus Bacon.js. Für den Testfall können wir jetzt aber anderes Verhalten injizieren und z.B. folgende Strategie wählen:

var output = new Bacon.Bus();
var lastValue;

var throttleStrategy = function(observable) {
  observable.onValue(function(x) {
    lastValue = x;
  });
  return output;
}

Unser Testcode kann nun mit output.push(lastValue) das Verhalten von throttle steuern.

  • Schnittstellen für Monitoring und Management anbieten! Eine große Herausforderung ist, unerwartetes Verhalten nachzuvollziehen. Klassisches Debugging ist nicht sehr wirksam, da der Großteil des Kontrollflusses außerhalb unseres Codes liegt. Logging/Monitoring verdienen besondere Aufmerksamkeit, insb. da in diesem Bereich Bacon.js noch Schwächen zeigt. Logging wie Monitoring können wir über entsprechende Ströme anbieten (wir nutzen einen Strom pro Log-Level).

Besonders in einer Serverumgebung sind Management-Schnittstellen wichtig, z.B. um den Timeout des Circuit Breakers im laufenden Betrieb zu rekonfigurieren. Auch das ist im Falle von Bacon.js mit einem Bus möglich.

Fazit

Im Zusammenspiel mit AngularJS kann Bacon.js den Bau von Oberflächen und Komponenten mit komplexen Interaktionen vereinfachen und unseren Code einfach und kompakt halten. Von den gleichen Konzepten, die AngularJS für Databinding oder Dependency Injection nutzt, können wir nun auch profitieren.

Inhaltsverzeichnis


Email Newsletter

Um alle Neuigkeiten zu erfahren, abonniere hier unseren Newsletter!

Newsletter abonnieren
Ramy Hardan

Ramy Hardan

Ramy Hardan schrieb seine erste Software im Alter von 12 Jahren. Der Schritt in die Selbständigkeit erfolgte schon zu Beginn seines Informatik-Studiums und bis zu dessen Abschluss entstanden etliche Web-Anwendungen, eine mit mehr als 300.000 Nutzern. 2001 begann Ramy Hardan, eigene Werkzeuge zu bauen, mit denen er Teile der Entwicklung automatisierte und Software schneller liefern konnte. Heute generiert er bis zu 80% eines Softwaresystems automatisch. Unter dem Dach der bitzeche GmbH baut Ramy Hardan seit 2010 ein schlagkräftiges Team von Experten auf, das hochskalierbare, ausfallsichere Web- und Mobil-Anwendungen entwickelt.