High-Performance Multi-Core Processing

This example shows how to scale capture across multiple CPU cores using jNetWorks' built-in load balancing.

Features demonstrated:

  • Multi-stream creation with naming pattern

  • 5-tuple hashing for flow affinity

  • Frame slicing for header-only processing

  • Parallel task forking with TaskScope

/* 
 * Licensed under the Apache License, Version 2.0 - see http://www.apache.org/licenses/LICENSE-2.0
 */
package com.slytechs.sdk.jetnetworks;

import java.time.Duration;

import com.slytechs.sdk.common.session.SessionShutdownException;
import com.slytechs.sdk.jnetworks.Net;
import com.slytechs.sdk.jnetworks.NetException;
import com.slytechs.sdk.jnetworks.PacketStream;
import com.slytechs.sdk.jnetworks.PacketStreamSettings;
import com.slytechs.sdk.jnetworks.config.HashType;
import com.slytechs.sdk.jnetworks.pcap.PcapBackend;
import com.slytechs.sdk.jnetworks.task.TaskScope;
import com.slytechs.sdk.protocol.core.Packet;
import com.slytechs.sdk.protocol.core.stack.ProtocolStack;

public class MultiCoreProcessing {

    public static void main(String[] args) {
        new MultiCoreProcessing().run();
    }

    public void run() {

        Net.activateLicense();

        try (Net net = new PcapBackend()) {

            ProtocolStack stack = new ProtocolStack();
            PacketStreamSettings settings = new PacketStreamSettings();

            // Create 4 parallel streams (one per core)
            PacketStream[] streams = net.createPacketStreams("icmp-stream-%d", 4, settings, stack);

            // Capture ICMP from specific interfaces with load balancing
            net.capture("en0", "en1")
                    .filter("icmp")
                    .pipe(streams)
                    .hash(HashType.HASH_5_TUPLE)  // Preserve flow affinity
                    .slice(64)                   // Capture only first 64 bytes (headers)
                    .apply();

            try (TaskScope scope = new TaskScope(net)) {
                scope.shutdownAfter(Duration.ofMinutes(1));

                // Fork a worker task for each stream
                scope.fork(streams, this::processStream)
                        .awaitCompletion();
            }

        } catch (NetException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void processStream(PacketStream stream) {
        while (stream.isActive()) {
            try {
                Packet packet = stream.take();

                // Process packet (e.g., count, analyze headers, etc.)
                System.out.printf("Stream %s: ICMP packet captured (caplen=%d)%n",
                        stream.name(), packet.capLength());

                stream.release(packet);

            } catch (SessionShutdownException | InterruptedException e) {
                // Graceful exit
                return;
            }
        }
    }
}

Key Concepts Demonstrated

  • createPacketStreams("name-%d", N) – Automatic parallel stream creation

  • hash(HashType.HASH_5_TUPLE) – Ensures packets from the same flow go to the same core

  • slice(64) – Reduces memory bandwidth by capturing only needed bytes

  • scope.fork(...).awaitCompletion() – Clean, structured parallelism

Last updated