High-Performance Multi-Core Processing
This example shows how to scale capture across multiple CPU cores using jNetWorks' built-in load balancing.
Features demonstrated:
Multi-stream creation with naming pattern
5-tuple hashing for flow affinity
Frame slicing for header-only processing
Parallel task forking with
TaskScope
/*
* Licensed under the Apache License, Version 2.0 - see http://www.apache.org/licenses/LICENSE-2.0
*/
package com.slytechs.sdk.jetnetworks;
import java.time.Duration;
import com.slytechs.sdk.common.session.SessionShutdownException;
import com.slytechs.sdk.jnetworks.Net;
import com.slytechs.sdk.jnetworks.NetException;
import com.slytechs.sdk.jnetworks.PacketStream;
import com.slytechs.sdk.jnetworks.PacketStreamSettings;
import com.slytechs.sdk.jnetworks.config.HashType;
import com.slytechs.sdk.jnetworks.pcap.PcapBackend;
import com.slytechs.sdk.jnetworks.task.TaskScope;
import com.slytechs.sdk.protocol.core.Packet;
import com.slytechs.sdk.protocol.core.stack.ProtocolStack;
public class MultiCoreProcessing {
public static void main(String[] args) {
new MultiCoreProcessing().run();
}
public void run() {
Net.activateLicense();
try (Net net = new PcapBackend()) {
ProtocolStack stack = new ProtocolStack();
PacketStreamSettings settings = new PacketStreamSettings();
// Create 4 parallel streams (one per core)
PacketStream[] streams = net.createPacketStreams("icmp-stream-%d", 4, settings, stack);
// Capture ICMP from specific interfaces with load balancing
net.capture("en0", "en1")
.filter("icmp")
.pipe(streams)
.hash(HashType.HASH_5_TUPLE) // Preserve flow affinity
.slice(64) // Capture only first 64 bytes (headers)
.apply();
try (TaskScope scope = new TaskScope(net)) {
scope.shutdownAfter(Duration.ofMinutes(1));
// Fork a worker task for each stream
scope.fork(streams, this::processStream)
.awaitCompletion();
}
} catch (NetException | InterruptedException e) {
e.printStackTrace();
}
}
private void processStream(PacketStream stream) {
while (stream.isActive()) {
try {
Packet packet = stream.take();
// Process packet (e.g., count, analyze headers, etc.)
System.out.printf("Stream %s: ICMP packet captured (caplen=%d)%n",
stream.name(), packet.capLength());
stream.release(packet);
} catch (SessionShutdownException | InterruptedException e) {
// Graceful exit
return;
}
}
}
}Key Concepts Demonstrated
createPacketStreams("name-%d", N) – Automatic parallel stream creation
hash(HashType.HASH_5_TUPLE) – Ensures packets from the same flow go to the same core
slice(64) – Reduces memory bandwidth by capturing only needed bytes
scope.fork(...).awaitCompletion() – Clean, structured parallelism
Last updated