Artikel: AsyncAPI Technologie
Spezifikation, Dokumentation und Code-Generierung mit AsyncAPI
AsyncAPI beschreibt Schnittstellen und Antwortstrukturen für asynchrone APIs, wie zum Beispiel Messaging-Systeme, Event-Broker und dergleichen. Die APIs der Deutschen Bahn nutzen AsyncAPI um ein Event-System zu definieren, welches synchronen Pull-APIs asynchrone Push-Nachrichten zur Seite stellt und damit eine reaktive Programmierung ermöglicht.
So können zum Beispiel Use-Cases wie
- aktives Nachladen von Webseiten oder Apps im Falle einer Fahrtänderung
- Anzeigen von Großstörungen im Falle der Veröffentlichung von Großstörungsinformationen
- Zwischenspeichern (cachen) von Wagenreihungsinformationen und ereignisbasiertes Aktualisieren des Caches
realisiert werden.
Code-Generierung
Die AsyncAPI Spezifikation kann zur automatisierten Code-Generierung für eine große Anzahl von Programmiersprachen und Frameworks verwendet werden. Damit können selbst für umfangreiche Antwortstrukturen in wenigen Minuten Clients automatisiert erzeugt und im folgenden direkt verwendet werden.
Zur Code-Generierung empfehlen wir den AsyncAPI Generator und verweisen auf dessen umfangreiche Dokumentation mit zahlreichen Beispielen. Am Beispiel der Programmiersprache Java lassen sich Modelklassen z.B. wie folgt generieren:
asyncapi generate models java --javaJackson
--packageName=de.db.ris.events
--output=<OUTPUT_DIRECTORY>
<PATH_TO_YOUR_SPEC>
MQTT Client verbinden
Um das Eventsystem in einer Backend 2 Backend Kommunikation mit Lastverteilung und Ausfallsicherheit nutzen zu können, muss eine MQTT Verbindung mit dem Broker aufgebaut werden.
Hier ist darauf zu achten, dass
- eine Shared-Subscription verwendet wird, um die Events auf mehrere Maschinen zu verteilen
- eine Durable Shared-Subscription verwendet wird, damit Events im Falle eines kurzzeitigen Verbindungsabbruch vom Broker aufbewahrt werden
- der Session Timeout korrekt gesetzt ist, damit der Broker weiß, wie lange er ein Abonnement im Falle eines Verbindungsabbruchs aufrecht halten muss (Achtung: Bitte sinnvolle kurze Zeiten verwenden)
- mit CleanStart = false verbunden wird, damit eine vorherige Durable Shared-Subscription ggf. wieder aufgenommen werden kann
- mit eindeutigen ClientIDs verbunden wird, da sich verschiedene Maschinen sonst gegenseitig disconnecten
- QualityOfService = 1 verwendet wird
Am Beispiel der Programmiersprache Java und der MQTT-Library Eclipse Paho geht dies z.B. so:
import org.eclipse.paho.mqttv5.*;
final String sClientID = "<ID_MUST_BE_UNIQUE_ACCROSS_ALL_CLIENTS>";
final MqttClient oClient = new MqttClient("<ADDRESS_OF_MQTT_BROKER>", sClientID,
new MqttDefaultFilePersistence("/tmp"));
final MqttConnectionOptions oConnectionOptions = new MqttConnectionOptions();
oConnectionOptions.setCleanStart(false);
oConnectionOptions.setSessionExpiryInterval(Long.valueOf(<DURATION_TO_KEEP_SESSION>));
oConnectionOptions.setAutomaticReconnect(true);
oConnectionOptions.setUserName("<USERNAME>");
oConnectionOptions.setPassword("<PASSWORD>".getBytes());
final SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(someSeriousKeyManager, someSeriousTrustManager,
new java.security.SecureRandom());
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
oConnectionOptions.setHttpsHostnameVerificationEnabled(false);
oConnectionOptions.setSocketFactory(sslSocketFactory);
oClient.connect(oConnectionOptions);
Das Subscriben auf alle JourneyChanged Events geht z.B. wie folgt:
IMqttToken oToken = oClient.subscribe("$share/<UNIQUE_STABLE_GROUP_IDENTIFIER_PER_ENVIRONMENT>/RISAPI/PROD/journeys/v1/#", 1);
if (oToken.isComplete() && oToken.getReasonCodes()[0] == 135) {
LOGGER.error("not authorized to subscribe");
}
Achtung: Der Callback für dem Empfang der Nachrichten muss vorher registriert werden.
WebSocket Client verbinden
Um das Eventsystem in einer Backend 2 Frontend Kommunikation (z.B. auf einer Webseite oder in einer App) nutzen zu können, muss eine WebSocket Verbindung mit dem Broker aufgebaut werden. Am Beispiel der Programmiersprache JavaScript geht dies z.B. wie folgt:
const clientId = '<ID_MUST_BE_UNIQUE_ACCROSS_ALL_CLIENTS>';
const client = mqtt.connect({
clientId,
protocol: 'wss',
hostname: '<ADDRESS_OF_MQTT_BROKER>',
port: <PORT_OF_MQTT_BROKER>,
username: '<USERNAME>',
password: '<PASSWORD>',
});
Das subscriben auf alle DisruptiponCommunicationChanged Events geht dann z.B. wie folgt:
client.on('connect', () => {
client.subscribe('RISAPI/PROD/disruption-communications/v1/#');
});
Links
- AsyncAPI Spezifikation
- AsyncAPI CLI Generator
- AsyncAPI Studio zur Visualisierung