RiFahrt

Zum Inhalt springen

Event-Streaming für Fahrten der DB Fernverkehr AG und DB Regio AG.

Informationsumfang

Der RiFahrt-Datenstrom beinhaltet planerische Informationen und Echtzeitinformationen zu Fahrten der DB Fernverkehr AG und DB Regio AG, darunter unter anderem:

  • Fahrtbezeichnung, Fahrtnummer, Linien, Verwaltungen/Betreiber
  • Haltestellen, Ankunfts- und Abfahrtszeiten
  • Richtungstexte, Fahrradmitnahme, Rollstuhlmitnahme
  • Bahnhöfe, Haltestellen, Gleise
  • Durchbindungen, Vereinigungen, Ersatz- und Entlastungsfahrten

Übergreifende, stabile Fahrt-IDs ermöglichen eine einfache Verknüpfung des Datenstroms mit APIs oder GTFS/GTFS-RT Datenströmen der Deutschen Bahn AG.

Der Datenstrom kann maximal bis zu 14 Tage in die Zukunft abonniert werden.

Technische Anbindung

RiFahrt ist ein Event-Streaming-Service der Deutschen Bahn, der kontinuierlich Fahrplandaten und Echtzeitinformationen liefert. Die Daten werden über RabbitMQ als Protocol Buffer bereitgestellt.

Protocol Buffer Setup

1. Proto-Datei erhalten

Die rifahrt.proto Datei wird Ihnen nach Registrierung vom RiFahrt-Team bereitgestellt.

2. Code-Generierung

Hinweis: Alle Beispiele setzen voraus, dass das rifahrt.proto korrekt generiert und eingebunden wurde.

Java
# Mit protoc Compiler
protoc --java_out=./src/main/java \
       --java_multiple_files=true \
       rifahrt/v1/rifahrt.proto
# Mit Gradle Plugin - siehe: https://github.com/google/protobuf-gradle-plugin
plugins {
    id 'com.google.protobuf' version '0.9.4'
}
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
}
# Mit Maven Plugin - siehe: https://github.com/ascopes/protobuf-maven-plugin
<plugin>
    <groupId>com.github.ascopes</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>2.2.0</version>
</plugin>
Ende des Expander-Inhaltes
Python
# Installation
pip install protobuf grpcio-tools
# Code-Generierung
python -m grpc_tools.protoc \
    --python_out=./generated \
    --proto_path=. \
    rifahrt/v1/rifahrt.proto
Ende des Expander-Inhaltes
Go
# Installation
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# Code-Generierung
protoc --go_out=./generated \
       --go_opt=paths=source_relative \
       rifahrt/v1/rifahrt.proto
Ende des Expander-Inhaltes
C#/.NET
<!-- In .csproj -->
<ItemGroup>
  <PackageReference Include="Google.Protobuf" Version="3.25.1" />
  <PackageReference Include="Grpc.Tools" Version="2.59.0">
    <PrivateAssets>all</PrivateAssets>
  </PackageReference>
</ItemGroup>
<ItemGroup>
  <Protobuf Include="rifahrt\v1\rifahrt.proto" />
</ItemGroup>
Ende des Expander-Inhaltes

RabbitMQ Verbindung

Verbindungsparameter:

Parameter

Wert

Beschreibung

Host

<wird bereitgestellt>

RabbitMQ Server

Port

5671

SSL/TLS Port

Username

<wird bereitgestellt>

Ihr Benutzername

Password

<wird bereitgestellt>

Ihr Passwort

Virtual Host

<wird bereitgestellt>

Ihr VHost

Queue

<wird bereitgestellt>

Ihre Queue

TLS

v1.2+

Erforderlich

Implementierungsbeispiele:

Java (Vollständiges Beispiel)
import com.rabbitmq.client.*;
import de.db.ri.rifahrt.v1.*;
import javax.net.ssl.*;
import java.security.KeyStore;
import java.util.concurrent.TimeUnit;
public class RiFahrtConsumer {
    private static final String HOST = "<RABBITMQ_HOST>";
    private static final int PORT = 5671;
    private static final String USERNAME = "<USERNAME>";
    private static final String PASSWORD = "<PASSWORD>";
    private static final String VIRTUAL_HOST = "<VIRTUAL_HOST>";
    private static final String QUEUE_NAME = "<QUEUE_NAME>";
    
    private Connection connection;
    private Channel channel;
    
    public void connect() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VIRTUAL_HOST);
        
        // SSL-Konfiguration
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        KeyStore ks = KeyStore.getInstance("JKS");
        ks.load(getClass().getResourceAsStream("/truststore.jks"), 
                "changeit".toCharArray());
        tmf.init(ks);
        sslContext.init(null, tmf.getTrustManagers(), null);
        factory.useSslProtocol(sslContext);
        
        // Verbindungsparameter
        factory.setConnectionTimeout(30000);
        factory.setHandshakeTimeout(30000);
        factory.setRequestedHeartbeat(60);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        
        // Verbindung herstellen
        connection = factory.newConnection("<CONSUMER NAME>");
        channel = connection.createChannel();
        
        // Queue-Konfiguration
        channel.basicQos(10); // Prefetch für Performance
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    }
    
    public void startConsuming() throws Exception {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                // Event deserialisieren
                RiFahrtEvent event = RiFahrtEvent.parseFrom(delivery.getBody());
                
                // Event verarbeiten
                processEvent(event);
                
                // Nachricht bestätigen
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                
            } catch (Exception e) {
                System.err.println("Fehler bei Verarbeitung: " + e.getMessage());
                // Bei Fehler: Nachricht zurück in Queue (requeue)
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), 
                                 false, true);
            }
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Consumer wurde abgebrochen: " + consumerTag);
        };
        
        // Consumer starten
        String consumerTag = channel.basicConsume(
            QUEUE_NAME, 
            false, // Auto-ACK deaktiviert
            "RiFahrt-Consumer-" + System.currentTimeMillis(),
            deliverCallback, 
            cancelCallback
        );
        
        System.out.println("Consumer gestartet mit Tag: " + consumerTag);
    }
    
    private void processEvent(RiFahrtEvent event) {
        // Meta-Informationen
        Meta meta = event.getMeta();
        System.out.println("Event ID: " + meta.getId());
        System.out.println("Zeitstempel: " + event.getLastChangedTimestamp());
        
        // Polymorphe Datenstruktur
        switch (event.getDataCase()) {
            case RIFAHRTDATA:
                processFahrtData(event.getRiFahrtData());
                break;
                
            case RIFAHRTGELOESCHTDATA:
                processFahrtGeloescht(event.getRiFahrtGeloeschtData());
                break;
                
            case DATA_NOT_SET:
                System.err.println("Keine Daten im Event!");
                break;
        }
    }
    
    private void processFahrtData(RiFahrtData data) {
        System.out.println("Fahrt ID: " + data.getFahrtId());
        
        // Verkehrsmittel-Informationen
        for (Verkehrsmittel vm : data.getAllVerkehrsmittelList()) {
            System.out.println("Verkehrsmittel: " + 
                vm.getGattung() + " " + vm.getFahrtnummer());
        }
        
        // Fahrtereignisse verarbeiten
        for (Fahrtereignis ereignis : data.getAllFahrtereignisList()) {
            processEreignis(ereignis);
        }
    }
    
    private void processEreignis(Fahrtereignis ereignis) {
        HaltestelleInFahrt haltestelle = ereignis.getHaltestelle();
        System.out.println("\nHaltestelle: " + haltestelle.getName() + 
                          " (" + haltestelle.getEvaNummer() + ")");
        
        // Zeiten
        System.out.println("  Soll: " + ereignis.getZeitPlan());
        System.out.println("  Ist/Prognose: " + ereignis.getZeit());
        System.out.println("  Zeittyp: " + ereignis.getZeitTyp());
        
        // Gleis/Plattform
        if (ereignis.hasPlattform()) {
            System.out.println("  Gleis: " + ereignis.getPlattform());
            if (ereignis.hasPlattformPlan() && 
                !ereignis.getPlattform().equals(ereignis.getPlattformPlan())) {
                System.out.println("  Gleiswechsel von: " + 
                                  ereignis.getPlattformPlan());
            }
        }
        
        // Status
        if (ereignis.hasAusgefallen() && ereignis.getAusgefallen()) {
            System.out.println("  ⚠️ AUSGEFALLEN");
        }
        if (ereignis.hasZusaetzlich() && ereignis.getZusaetzlich()) {
            System.out.println("  ➕ ZUSÄTZLICHER HALT");
        }
    }
    
    // Graceful Shutdown
    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close(5000);
            }
        } catch (Exception e) {
            System.err.println("Fehler beim Schließen: " + e.getMessage());
        }
    }
}
Ende des Expander-Inhaltes
Spring Boot Integration
@Configuration
@EnableRabbit
public class RabbitMQConfig {
    
    @Value("${rifahrt.rabbitmq.host}")
    private String host;
    
    @Value("${rifahrt.rabbitmq.port}")
    private int port;
    
    @Value("${rifahrt.rabbitmq.username}")
    private String username;
    
    @Value("${rifahrt.rabbitmq.password}")
    private String password;
    
    @Value("${rifahrt.rabbitmq.virtualHost}")
    private String virtualHost;
    
    @Value("${rifahrt.rabbitmq.queue}")
    private String queueName;
    
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        
        // SSL-Konfiguration
        RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
        factoryBean.setUseSSL(true);
        factoryBean.setTlsVersion("TLSv1.2");
        factoryBean.setSslAlgorithm("TLSv1.2");
        factoryBean.setKeyStore("classpath:keystore.jks");
        factoryBean.setKeyStorePassphrase("changeit");
        factoryBean.setTrustStore("classpath:truststore.jks");
        factoryBean.setTrustStorePassphrase("changeit");
        factoryBean.afterPropertiesSet();
        
        factory.setConnectionFactory(factoryBean.getObject());
        return factory;
    }
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(
            ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = 
            new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setPrefetchCount(10);
        return container;
    }
    
    @Bean
    public MessageListenerAdapter listenerAdapter(
            RiFahrtMessageHandler messageHandler) {
        return new MessageListenerAdapter(messageHandler, "handleMessage");
    }
}
@Component
public class RiFahrtMessageHandler {
    
    public void handleMessage(Message message, Channel channel) 
            throws Exception {
        try {
            // Protobuf deserialisieren
            RiFahrtEvent event = RiFahrtEvent.parseFrom(message.getBody());
            
            // Event verarbeiten
            processEvent(event);
            
            // ACK senden
            channel.basicAck(message.getMessageProperties()
                                   .getDeliveryTag(), false);
        } catch (Exception e) {
            // NACK mit requeue
            channel.basicNack(message.getMessageProperties()
                                    .getDeliveryTag(), false, true);
            throw e;
        }
    }
    
    private void processEvent(RiFahrtEvent event) {
        // Implementierung der Event-Verarbeitung
    }
}
Ende des Expander-Inhaltes

Event-Verarbeitung

Wichtige Konzepte:

  1. Polymorphe Events: Jedes Event kann entweder RiFahrtData (Update) oder RiFahrtGeloeschtData (Löschung) enthalten
  2. Message Acknowledgement: Bestätigen Sie jede Nachricht nach erfolgreicher Verarbeitung
  3. Fehlerbehebung: Im Fehlerfall sollte die Nachricht mit nack (negative acknowledgement) zurück in die Queue gestellt werden

Datenmodell im Detail

Event-Struktur

RiFahrtEvent
├── Meta (Technische Metadaten)
│   ├── id: Event-ID
│   ├── owner: Datenherkunft
│   ├── created: Erstellungszeitpunkt
│   └── correlation: Verknüpfte Events
├── lastChangedTimestamp: Letzte Änderung
└── data (one of):
    ├── RiFahrtData: Fahrtdaten
    └── RiFahrtGeloeschtData: Löschung

Wichtige Datentypen

Fahrtereignis:

  • Typ:
    • ANKUNFT
    • ABFAHRT
  • Zeiten:
    • zeitPlan: Geplante Zeit
    • zeit: Aktuelle Zeit (Ist/Prognose)
    • zeitTyp: PLAN, PROGNOSE oder ECHT
  • Status-Flags:
    • ausgefallen: Halt entfällt
    • zusaetzlich: Zusätzlicher Halt
    • keinFahrgastwechsel: Kein Ein-/Ausstieg möglich

Nachrichten-System

Das Nachrichten-System verwendet IDs zur Deduplizierung: 

  • Störungskommunikation: Betriebliche Störungen
  • Hinweise: Allgemeine Informationen
  • Attribute: Fahrtmerkmale (z.B. Fahrradmitnahme)
  • RIS-Kundengründe: Standardisierte Verspätungsgründe

Best Practices

Null-Werte in Protobuf

⚠️ Wichtig: Protocol Buffers unterstützen keine klassischen null-Werte!

Default-Werte statt null: Nicht gesetzte Felder haben immer einen Default-Wert:

  • Numerische Typen: 0 
  • Strings: " " (leerer String)
  • Booleans: false
  • Enums: Der erste definierte Wert (normalerweise 0)
  • Messages: null (hier die einzige Ausnahme)

hasXXX() Methoden verwenden: Um zu prüfen, ob ein Feld explizit gesetzt wurde:

// Richtig: Prüfen ob Feld gesetzt wurde
if (ereignis.hasPlattform()) {
    String gleis = ereignis.getPlattform();
}
// Falsch: Direkt auf null prüfen (funktioniert nicht für primitive Felder)
if (ereignis.getPlattform() != null) { // String ist nie null!
    // ...
}

JSON-Konvertierung Besonderheiten

⚠️Wichtig: Bei der Konvertierung von Protobuf nach JSON werden Felder mit Default-Werten standardmäßig nicht ausgegeben!

Problem: Ein nicht gesetztes Feld und ein explizit auf Default gesetztes Feld sind nicht unterscheidbar.

Beispiel:

// Protobuf Message
Fahrtereignis ereignis = Fahrtereignis.newBuilder()
    .setZeit("10:30")
    // ausgefallen wird nicht gesetzt (default: false)
    .build();
// JSON-Ausgabe (Standard)
{
  "zeit": "10:30"
  // "ausgefallen" fehlt, da es den Default-Wert false hat!
}

Lösung für vollständige JSON-Ausgabe:

// Java: Mit JsonFormat includingDefaultValueFields
String json = JsonFormat.printer()
    .includingDefaultValueFields()
    .print(ereignis);
// Resultat:
{
  "zeit": "10:30",
  "ausgefallen": false  // Jetzt enthalten!
}

Robuste Verbindungsverwaltung

// Automatisches Reconnect aktivieren
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setTopologyRecoveryEnabled(true);
// Heartbeat für Verbindungsüberwachung
factory.setRequestedHeartbeat(60);

Effiziente Event-Verarbeitung

  • Batch-Verarbeitung: Verarbeiten Sie mehrere Events gleichzeitig
  • Asynchrone Verarbeitung: Nutzen Sie Thread-Pools für CPU-intensive Aufgaben
  • Caching: Speichern Sie häufig benötigte Daten (z.B. Stationsinformationen)

Monitoring und Logging:

// Strukturiertes Logging mit Event-Context
MDC.put("eventId", event.getMeta().getId());
MDC.put("fahrtId", fahrtData.getFahrtId());
logger.info("Event verarbeitet");
MDC.clear();
// Metriken erfassen
meterRegistry.counter("rifahrt.events.processed").increment();
meterRegistry.timer("rifahrt.processing.time").record(duration);

Fehlerbehandlung:

// Retry-Strategie implementieren
@Retryable(value = {Exception.class}, 
           maxAttempts = 3, 
           backoff = @Backoff(delay = 1000))
public void processEvent(RiFahrtEvent event) {
    // Verarbeitung mit automatischen Wiederholungen
}
// Dead Letter Queue für fehlgeschlagene Nachrichten
channel.exchangeDeclare("dlx", "direct", true);
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
channel.queueDeclare(queueName, true, false, false, args);

Troubleshooting:

SSL/TLS Verbindungsfehler

Problem: SSLHandshakeException oder CertificateException

Lösung:

  1. Stellen Sie sicher, dass Sie TLS 1.2 oder höher verwenden
  2. Überprüfen Sie, ob das Truststore-Zertifikat aktuell ist
  3. Validieren Sie die SSL-Konfiguration
// Debug-Ausgaben aktivieren
System.setProperty("javax.net.debug", "ssl:handshake");
// Explizite TLS-Version
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
Ende des Expander-Inhaltes
Keine Events empfangen

Problem: Consumer läuft, aber keine Events kommen an

Checkliste:

  1. Queue-Name korrekt?
  2. Virtual Host korrekt?
  3. Consumer erfolgreich registriert?
  4. Prefetch-Count gesetzt?
// Debugging
channel.queueDeclarePassive(queueName); // Prüft ob Queue existiert
long messageCount = channel.messageCount(queueName);
System.out.println("Nachrichten in Queue: " + messageCount);
Ende des Expander-Inhaltes
Protobuf Deserialisierungsfehler

Problem: InvalidProtocolBufferException

Mögliche Ursachen:

  1. Veraltete Proto-Definition
  2. Falsche Nachrichtenformatierung
  3. Korrupte Nachricht

Lösung:

try {
    RiFahrtEvent event = RiFahrtEvent.parseFrom(body);
} catch (InvalidProtocolBufferException e) {
    // Hex-Dump für Debugging
    logger.error("Fehlerhafte Nachricht: " + 
                 DatatypeConverter.printHexBinary(body));
    // An Dead Letter Queue senden
}
Ende des Expander-Inhaltes
Memory-Probleme bei hohem Durchsatz

Problem: OutOfMemoryError oder hohe Memory-Nutzung

Optimierung:

// 1. Prefetch limitierenchannel.basicQos(10); // Nicht zu viele unbestätigte Nachrichten

// 2. Streaming-Verarbeitungevent.getAllFahrtereignisList().stream()    .parallel()    .forEach(this::processEreignis);

// 3. Objekt-Pooling für häufige Operationenprivate final ObjectPool<DateTimeFormatter> formatterPool =     new GenericObjectPool<>(new DateTimeFormatterFactory());

Ende des Expander-Inhaltes

Kontakt

Bei Fragen oder Feedback zum RiFahrt-Datenstrom: ris-datenstroeme@deutschebahn.com

Link