Event-Streaming für Fahrten der DB Fernverkehr AG und DB Regio AG.
Informationsumfang
Der RiFahrt-Datenstrom beinhaltet planerische Informationen und Echtzeitinformationen zu Fahrten der DB Fernverkehr AG und DB Regio AG, darunter unter anderem:
- Fahrtbezeichnung, Fahrtnummer, Linien, Verwaltungen/Betreiber
- Haltestellen, Ankunfts- und Abfahrtszeiten
- Richtungstexte, Fahrradmitnahme, Rollstuhlmitnahme
- Bahnhöfe, Haltestellen, Gleise
- Durchbindungen, Vereinigungen, Ersatz- und Entlastungsfahrten
Übergreifende, stabile Fahrt-IDs ermöglichen eine einfache Verknüpfung des Datenstroms mit APIs oder GTFS/GTFS-RT Datenströmen der Deutschen Bahn AG.
Der Datenstrom kann maximal bis zu 14 Tage in die Zukunft abonniert werden.
Technische Anbindung
RiFahrt ist ein Event-Streaming-Service der Deutschen Bahn, der kontinuierlich Fahrplandaten und Echtzeitinformationen liefert. Die Daten werden über RabbitMQ als Protocol Buffer bereitgestellt.
Protocol Buffer Setup
1. Proto-Datei erhalten
Die rifahrt.proto Datei wird Ihnen nach Registrierung vom RiFahrt-Team bereitgestellt.
2. Code-Generierung
Hinweis: Alle Beispiele setzen voraus, dass das rifahrt.proto korrekt generiert und eingebunden wurde.
Java
# Mit protoc Compiler
protoc --java_out=./src/main/java \
--java_multiple_files=true \
rifahrt/v1/rifahrt.proto
# Mit Gradle Plugin - siehe: https://github.com/google/protobuf-gradle-plugin
plugins {
id 'com.google.protobuf' version '0.9.4'
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.1"
}
}
# Mit Maven Plugin - siehe: https://github.com/ascopes/protobuf-maven-plugin
<plugin>
<groupId>com.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>2.2.0</version>
</plugin>
Ende des Expander-Inhaltes
Python
# Installation
pip install protobuf grpcio-tools
# Code-Generierung
python -m grpc_tools.protoc \
--python_out=./generated \
--proto_path=. \
rifahrt/v1/rifahrt.proto
Ende des Expander-Inhaltes
Go
# Installation
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# Code-Generierung
protoc --go_out=./generated \
--go_opt=paths=source_relative \
rifahrt/v1/rifahrt.proto
Ende des Expander-Inhaltes
C#/.NET
<!-- In .csproj -->
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.25.1" />
<PackageReference Include="Grpc.Tools" Version="2.59.0">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<Protobuf Include="rifahrt\v1\rifahrt.proto" />
</ItemGroup>
Ende des Expander-Inhaltes
RabbitMQ Verbindung
Verbindungsparameter:
Parameter | Wert | Beschreibung | Host | <wird bereitgestellt> | RabbitMQ Server |
Port | 5671 | SSL/TLS Port |
Username | <wird bereitgestellt> | Ihr Benutzername |
Password | <wird bereitgestellt> | Ihr Passwort |
Virtual Host | <wird bereitgestellt> | Ihr VHost |
Queue | <wird bereitgestellt> | Ihre Queue |
TLS | v1.2+ | Erforderlich |
Implementierungsbeispiele:
Java (Vollständiges Beispiel)
import com.rabbitmq.client.*;
import de.db.ri.rifahrt.v1.*;
import javax.net.ssl.*;
import java.security.KeyStore;
import java.util.concurrent.TimeUnit;
public class RiFahrtConsumer {
private static final String HOST = "<RABBITMQ_HOST>";
private static final int PORT = 5671;
private static final String USERNAME = "<USERNAME>";
private static final String PASSWORD = "<PASSWORD>";
private static final String VIRTUAL_HOST = "<VIRTUAL_HOST>";
private static final String QUEUE_NAME = "<QUEUE_NAME>";
private Connection connection;
private Channel channel;
public void connect() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
// SSL-Konfiguration
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(getClass().getResourceAsStream("/truststore.jks"),
"changeit".toCharArray());
tmf.init(ks);
sslContext.init(null, tmf.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
// Verbindungsparameter
factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(30000);
factory.setRequestedHeartbeat(60);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Verbindung herstellen
connection = factory.newConnection("<CONSUMER NAME>");
channel = connection.createChannel();
// Queue-Konfiguration
channel.basicQos(10); // Prefetch für Performance
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
}
public void startConsuming() throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
// Event deserialisieren
RiFahrtEvent event = RiFahrtEvent.parseFrom(delivery.getBody());
// Event verarbeiten
processEvent(event);
// Nachricht bestätigen
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Fehler bei Verarbeitung: " + e.getMessage());
// Bei Fehler: Nachricht zurück in Queue (requeue)
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),
false, true);
}
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Consumer wurde abgebrochen: " + consumerTag);
};
// Consumer starten
String consumerTag = channel.basicConsume(
QUEUE_NAME,
false, // Auto-ACK deaktiviert
"RiFahrt-Consumer-" + System.currentTimeMillis(),
deliverCallback,
cancelCallback
);
System.out.println("Consumer gestartet mit Tag: " + consumerTag);
}
private void processEvent(RiFahrtEvent event) {
// Meta-Informationen
Meta meta = event.getMeta();
System.out.println("Event ID: " + meta.getId());
System.out.println("Zeitstempel: " + event.getLastChangedTimestamp());
// Polymorphe Datenstruktur
switch (event.getDataCase()) {
case RIFAHRTDATA:
processFahrtData(event.getRiFahrtData());
break;
case RIFAHRTGELOESCHTDATA:
processFahrtGeloescht(event.getRiFahrtGeloeschtData());
break;
case DATA_NOT_SET:
System.err.println("Keine Daten im Event!");
break;
}
}
private void processFahrtData(RiFahrtData data) {
System.out.println("Fahrt ID: " + data.getFahrtId());
// Verkehrsmittel-Informationen
for (Verkehrsmittel vm : data.getAllVerkehrsmittelList()) {
System.out.println("Verkehrsmittel: " +
vm.getGattung() + " " + vm.getFahrtnummer());
}
// Fahrtereignisse verarbeiten
for (Fahrtereignis ereignis : data.getAllFahrtereignisList()) {
processEreignis(ereignis);
}
}
private void processEreignis(Fahrtereignis ereignis) {
HaltestelleInFahrt haltestelle = ereignis.getHaltestelle();
System.out.println("\nHaltestelle: " + haltestelle.getName() +
" (" + haltestelle.getEvaNummer() + ")");
// Zeiten
System.out.println(" Soll: " + ereignis.getZeitPlan());
System.out.println(" Ist/Prognose: " + ereignis.getZeit());
System.out.println(" Zeittyp: " + ereignis.getZeitTyp());
// Gleis/Plattform
if (ereignis.hasPlattform()) {
System.out.println(" Gleis: " + ereignis.getPlattform());
if (ereignis.hasPlattformPlan() &&
!ereignis.getPlattform().equals(ereignis.getPlattformPlan())) {
System.out.println(" Gleiswechsel von: " +
ereignis.getPlattformPlan());
}
}
// Status
if (ereignis.hasAusgefallen() && ereignis.getAusgefallen()) {
System.out.println(" ⚠️ AUSGEFALLEN");
}
if (ereignis.hasZusaetzlich() && ereignis.getZusaetzlich()) {
System.out.println(" ➕ ZUSÄTZLICHER HALT");
}
}
// Graceful Shutdown
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close(5000);
}
} catch (Exception e) {
System.err.println("Fehler beim Schließen: " + e.getMessage());
}
}
}
Ende des Expander-Inhaltes
Spring Boot Integration
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Value("${rifahrt.rabbitmq.host}")
private String host;
@Value("${rifahrt.rabbitmq.port}")
private int port;
@Value("${rifahrt.rabbitmq.username}")
private String username;
@Value("${rifahrt.rabbitmq.password}")
private String password;
@Value("${rifahrt.rabbitmq.virtualHost}")
private String virtualHost;
@Value("${rifahrt.rabbitmq.queue}")
private String queueName;
@Bean
public ConnectionFactory connectionFactory() throws Exception {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
// SSL-Konfiguration
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setTlsVersion("TLSv1.2");
factoryBean.setSslAlgorithm("TLSv1.2");
factoryBean.setKeyStore("classpath:keystore.jks");
factoryBean.setKeyStorePassphrase("changeit");
factoryBean.setTrustStore("classpath:truststore.jks");
factoryBean.setTrustStorePassphrase("changeit");
factoryBean.afterPropertiesSet();
factory.setConnectionFactory(factoryBean.getObject());
return factory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setPrefetchCount(10);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(
RiFahrtMessageHandler messageHandler) {
return new MessageListenerAdapter(messageHandler, "handleMessage");
}
}
@Component
public class RiFahrtMessageHandler {
public void handleMessage(Message message, Channel channel)
throws Exception {
try {
// Protobuf deserialisieren
RiFahrtEvent event = RiFahrtEvent.parseFrom(message.getBody());
// Event verarbeiten
processEvent(event);
// ACK senden
channel.basicAck(message.getMessageProperties()
.getDeliveryTag(), false);
} catch (Exception e) {
// NACK mit requeue
channel.basicNack(message.getMessageProperties()
.getDeliveryTag(), false, true);
throw e;
}
}
private void processEvent(RiFahrtEvent event) {
// Implementierung der Event-Verarbeitung
}
}
Ende des Expander-Inhaltes
Event-Verarbeitung
Wichtige Konzepte:
- Polymorphe Events: Jedes Event kann entweder RiFahrtData (Update) oder RiFahrtGeloeschtData (Löschung) enthalten
- Message Acknowledgement: Bestätigen Sie jede Nachricht nach erfolgreicher Verarbeitung
- Fehlerbehebung: Im Fehlerfall sollte die Nachricht mit nack (negative acknowledgement) zurück in die Queue gestellt werden
Datenmodell im Detail
Event-Struktur
RiFahrtEvent
├── Meta (Technische Metadaten)
│ ├── id: Event-ID
│ ├── owner: Datenherkunft
│ ├── created: Erstellungszeitpunkt
│ └── correlation: Verknüpfte Events
├── lastChangedTimestamp: Letzte Änderung
└── data (one of):
├── RiFahrtData: Fahrtdaten
└── RiFahrtGeloeschtData: Löschung
Wichtige Datentypen
Fahrtereignis:
- Typ:
- Zeiten:
- zeitPlan: Geplante Zeit
- zeit: Aktuelle Zeit (Ist/Prognose)
- zeitTyp: PLAN, PROGNOSE oder ECHT
- Status-Flags:
- ausgefallen: Halt entfällt
- zusaetzlich: Zusätzlicher Halt
- keinFahrgastwechsel: Kein Ein-/Ausstieg möglich
Nachrichten-System
Das Nachrichten-System verwendet IDs zur Deduplizierung:
- Störungskommunikation: Betriebliche Störungen
- Hinweise: Allgemeine Informationen
- Attribute: Fahrtmerkmale (z.B. Fahrradmitnahme)
- RIS-Kundengründe: Standardisierte Verspätungsgründe
Best Practices
Null-Werte in Protobuf
⚠️ Wichtig: Protocol Buffers unterstützen keine klassischen null-Werte!
Default-Werte statt null: Nicht gesetzte Felder haben immer einen Default-Wert:
- Numerische Typen: 0
- Strings: " " (leerer String)
- Booleans: false
- Enums: Der erste definierte Wert (normalerweise 0)
- Messages: null (hier die einzige Ausnahme)
hasXXX() Methoden verwenden: Um zu prüfen, ob ein Feld explizit gesetzt wurde:
// Richtig: Prüfen ob Feld gesetzt wurde
if (ereignis.hasPlattform()) {
String gleis = ereignis.getPlattform();
}
// Falsch: Direkt auf null prüfen (funktioniert nicht für primitive Felder)
if (ereignis.getPlattform() != null) { // String ist nie null!
// ...
}
JSON-Konvertierung Besonderheiten
⚠️Wichtig: Bei der Konvertierung von Protobuf nach JSON werden Felder mit Default-Werten standardmäßig nicht ausgegeben!
Problem: Ein nicht gesetztes Feld und ein explizit auf Default gesetztes Feld sind nicht unterscheidbar.
Beispiel:
// Protobuf Message
Fahrtereignis ereignis = Fahrtereignis.newBuilder()
.setZeit("10:30")
// ausgefallen wird nicht gesetzt (default: false)
.build();
// JSON-Ausgabe (Standard)
{
"zeit": "10:30"
// "ausgefallen" fehlt, da es den Default-Wert false hat!
}
Lösung für vollständige JSON-Ausgabe:
// Java: Mit JsonFormat includingDefaultValueFields
String json = JsonFormat.printer()
.includingDefaultValueFields()
.print(ereignis);
// Resultat:
{
"zeit": "10:30",
"ausgefallen": false // Jetzt enthalten!
}
Robuste Verbindungsverwaltung
// Automatisches Reconnect aktivieren
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setTopologyRecoveryEnabled(true);
// Heartbeat für Verbindungsüberwachung
factory.setRequestedHeartbeat(60);
Effiziente Event-Verarbeitung
- Batch-Verarbeitung: Verarbeiten Sie mehrere Events gleichzeitig
- Asynchrone Verarbeitung: Nutzen Sie Thread-Pools für CPU-intensive Aufgaben
- Caching: Speichern Sie häufig benötigte Daten (z.B. Stationsinformationen)
Monitoring und Logging:
// Strukturiertes Logging mit Event-Context
MDC.put("eventId", event.getMeta().getId());
MDC.put("fahrtId", fahrtData.getFahrtId());
logger.info("Event verarbeitet");
MDC.clear();
// Metriken erfassen
meterRegistry.counter("rifahrt.events.processed").increment();
meterRegistry.timer("rifahrt.processing.time").record(duration);
Fehlerbehandlung:
// Retry-Strategie implementieren
@Retryable(value = {Exception.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000))
public void processEvent(RiFahrtEvent event) {
// Verarbeitung mit automatischen Wiederholungen
}
// Dead Letter Queue für fehlgeschlagene Nachrichten
channel.exchangeDeclare("dlx", "direct", true);
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
channel.queueDeclare(queueName, true, false, false, args);
Troubleshooting:
SSL/TLS Verbindungsfehler
Problem: SSLHandshakeException oder CertificateException
Lösung:
- Stellen Sie sicher, dass Sie TLS 1.2 oder höher verwenden
- Überprüfen Sie, ob das Truststore-Zertifikat aktuell ist
- Validieren Sie die SSL-Konfiguration
// Debug-Ausgaben aktivieren
System.setProperty("javax.net.debug", "ssl:handshake");
// Explizite TLS-Version
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
Ende des Expander-Inhaltes
Keine Events empfangen
Problem: Consumer läuft, aber keine Events kommen an
Checkliste:
- Queue-Name korrekt?
- Virtual Host korrekt?
- Consumer erfolgreich registriert?
- Prefetch-Count gesetzt?
// Debugging
channel.queueDeclarePassive(queueName); // Prüft ob Queue existiert
long messageCount = channel.messageCount(queueName);
System.out.println("Nachrichten in Queue: " + messageCount);
Ende des Expander-Inhaltes
Protobuf Deserialisierungsfehler
Problem: InvalidProtocolBufferException
Mögliche Ursachen:
- Veraltete Proto-Definition
- Falsche Nachrichtenformatierung
- Korrupte Nachricht
Lösung:
try {
RiFahrtEvent event = RiFahrtEvent.parseFrom(body);
} catch (InvalidProtocolBufferException e) {
// Hex-Dump für Debugging
logger.error("Fehlerhafte Nachricht: " +
DatatypeConverter.printHexBinary(body));
// An Dead Letter Queue senden
}
Ende des Expander-Inhaltes
Memory-Probleme bei hohem Durchsatz
Problem: OutOfMemoryError oder hohe Memory-Nutzung
Optimierung:
// 1. Prefetch limitieren
channel.basicQos(10); // Nicht zu viele unbestätigte Nachrichten
// 2. Streaming-Verarbeitung
event.getAllFahrtereignisList().stream()
.parallel()
.forEach(this::processEreignis);
// 3. Objekt-Pooling für häufige Operationen
private final ObjectPool<DateTimeFormatter> formatterPool =
new GenericObjectPool<>(new DateTimeFormatterFactory());
Ende des Expander-Inhaltes
Kontakt
Bei Fragen oder Feedback zum RiFahrt-Datenstrom: ris-datenstroeme@deutschebahn.com
Link