Solution de s-celles pour Very Cute Data

intro hardware bus de communication

13 décembre 2024

Table des matières

Le but est d’extraire des données binaires (D0) échantillonnées sur les fronts descendants d’une horloge (D1) dans un intervalle de temps spécifique (entre 350 µs et 2000 µs).

Une première observation avec PulseView nous donne :

Ou en zoomant

C’est bien joli mais ça ne va pas beaucoup nous aider…

Le sujet du challenge nous indique que l’on a tout intérêt à automatiser l’extraction des données et évoque la bibliothèque Python pyDigitalWaveTools.

La photo nous montre un joli sac de noeuds… essayons de faire en sorte que notre code n’y ressemble pas !

  1. Lecture du fichier VCD :

Nous pouvons lire le fichier au format Vcd.

from pyDigitalWaveTools.vcd.parser import VcdParser

fname = "very-cute-data.vcd"
with open(fname) as fd:
    vcd = VcdParser()
    vcd.parse(fd)
data = vcd.scope.toJson()  # Convertit en dictionnaire Python

Les données de nos signaux sont stockées dans un dictionnaire Python. S’agissant d’un traitement séquentiel des données il peut être judicieux de remettre tout cela dans l’ordre chronologique.

  1. Fusion ordonnée des signaux :
def merge_signals_general(data):
    """Combine les différents signaux en un seul flux ordonné par temps"""
    # Utilise une PriorityQueue pour maintenir l'ordre chronologique
    # Retourne des tuples (temps, valeur, nom_signal, chemin)
  1. Extraction des données :
def extract_signal(
    data: Dict[str, Any], start_time: int = None, end_time: int = None
) -> str:
    """
    Extrait les bits de D0 échantillonnés sur les fronts descendants de D1
    entre start_time et end_time (en µs)
    """
    d0, d0_prev = None, None
    d1, d1_prev = None, None
    result = []

    for time, value, signal_name, path in merge_signals_general(data):
        # Conversion ns -> µs
        time_us = time / 1000

        # Mise à jour des signaux
        if signal_name == "D0":
            d0, d0_prev = value, d0
        elif signal_name == "D1":
            d1, d1_prev = value, d1

        # Ignore avant l'intervalle
        if start_time is not None and time_us < start_time:
            continue

        # Arrête après l'intervalle
        if end_time is not None and time_us > end_time:
            break

        # Échantillonnage sur front descendant de D1
        if signal_name == "D1" and d1_prev == "1" and d1 == "0":
            if d0 is not None:
                result.append(d0)

    return "".join(result)

# Construction du flag
flag_bits = extract_signal(data)
flag = f"FCSC{{{flag_bits}}}"
print(flag)

Points clés de la solution :

  1. Format temporel : Le fichier VCD utilise des nanosecondes (ns), il faut donc diviser par 1000 pour avoir des microsecondes (µs)

  2. Synchronisation : Les données sont échantillonnées sur les fronts descendants de l’horloge (transition 1->0 de D1)

  3. État des signaux : On maintient l’état actuel et précédent de chaque signal pour détecter les transitions

  4. Fenêtrage temporel : On extrait uniquement les bits entre 350µs et 2000µs

Le flag obtenu est de la forme FCSC{séquence_de_bits}.

La difficulté principale réside dans la compréhension du protocole de communication synchrone et la gestion correcte des transitions de signaux.

Script complet

from pyDigitalWaveTools.vcd.parser import VcdParser
from queue import PriorityQueue
from typing import Dict, List, Tuple, Iterator, Any


def find_signals(
    node: Dict[str, Any], path: List[str] = None
) -> List[Tuple[List[str], List[Tuple[int, str]]]]:
    """Trouve récursivement tous les signaux dans la structure de données"""
    if path is None:
        path = []
    signals = []
    current_path = path + [node.get("name", "")]

    if "data" in node:
        signals.append((current_path, node["data"]))

    for child in node.get("children", []):
        signals.extend(find_signals(child, current_path))

    return signals


def merge_signals_general(data: Dict[str, Any]) -> Iterator[Tuple[int, str, str, str]]:
    """
    Fusionne les signaux en un flux temporel ordonné.
    Retourne: (temps, valeur, nom_signal, chemin_complet)
    """
    all_signals = find_signals(data)

    if not all_signals:
        return

    pq = PriorityQueue()
    indices = {i: 0 for i in range(len(all_signals))}

    # Initialise avec les premières valeurs
    for i, (path, signal_data) in enumerate(all_signals):
        if signal_data:
            pq.put((signal_data[0][0], (signal_data[0][1], i, ".".join(path))))
            indices[i] += 1

    # Parcours ordonné des événements
    while not pq.empty():
        time, (value, signal_idx, path) = pq.get()

        if indices[signal_idx] < len(all_signals[signal_idx][1]):
            next_data = all_signals[signal_idx][1][indices[signal_idx]]
            pq.put((next_data[0], (next_data[1], signal_idx, path)))
            indices[signal_idx] += 1

        yield (time, value, path.split(".")[-1], path)


def print_merged_signals(data: Dict[str, Any], limit: int = 10) -> None:
    """
    Affiche les premiers événements des signaux fusionnés.

    Args:
        data: Données à traiter
        limit: Nombre maximum d'événements à afficher
    """
    print(f"{'Temps':>10} | {'Signal':^10} | {'Valeur':^6} | Chemin complet")
    print("-" * 70)

    for i, (time, value, signal_name, path) in enumerate(merge_signals_general(data)):
        if i >= limit:
            print("...")
            break
        print(f"{time:>10} | {signal_name:^10} | {value:^6} | {path}")


def extract_signal(
    data: Dict[str, Any], start_time: int = None, end_time: int = None
) -> str:
    """
    Extrait les bits de D0 échantillonnés sur les fronts descendants de D1
    entre start_time et end_time (en µs)
    """
    d0, d0_prev = None, None
    d1, d1_prev = None, None
    result = []

    for time, value, signal_name, path in merge_signals_general(data):
        # Conversion ns -> µs
        time_us = time / 1000

        # Mise à jour des signaux
        if signal_name == "D0":
            d0, d0_prev = value, d0
        elif signal_name == "D1":
            d1, d1_prev = value, d1

        # Ignore avant l'intervalle
        if start_time is not None and time_us < start_time:
            continue

        # Arrête après l'intervalle
        if end_time is not None and time_us > end_time:
            break

        # Échantillonnage sur front descendant de D1
        if signal_name == "D1" and d1_prev == "1" and d1 == "0":
            if d0 is not None:
                result.append(d0)

    return "".join(result)


def main():
    # Lecture du fichier VCD
    fname = "very-cute-data.vcd"
    with open(fname) as fd:
        vcd = VcdParser()
        vcd.parse(fd)
    data = vcd.scope.toJson()

    print_merged_signals(data, limit=10)

    # Test sur l'exemple donné (0-350µs)
    test_bits = extract_signal(data, 0, 350)
    assert test_bits == "1000100010"
    print(f"Test (0-350µs): FCSC{{{test_bits}}}")  # Devrait donner FCSC{1000100010}

    # Extraction et affichage du flag
    flag_bits = extract_signal(data, 350, 2_000)
    flag = f"FCSC{{{flag_bits}}}"
    print(flag)


if __name__ == "__main__":
    main()

Exécution

$ python very-cute-data-script.py
     Temps |   Signal   | Valeur | Chemin complet
----------------------------------------------------------------------
         0 |     D0     |   1    | root.logic.D0
         0 |     D1     |   1    | root.logic.D1
     22875 |     D1     |   0    | root.logic.D1
     27312 |     D0     |   0    | root.logic.D0
     38375 |     D1     |   1    | root.logic.D1
     44375 |     D0     |   1    | root.logic.D0
     50250 |     D0     |   0    | root.logic.D0
     54937 |     D1     |   0    | root.logic.D1
     71437 |     D1     |   1    | root.logic.D1
     76125 |     D1     |   0    | root.logic.D1
...
Test (0-350µs): FCSC{1000100010}
FCSC{0000000011000000000000000000000001000100010000011}