Skip to content
Snippets Groups Projects
Commit 008d6365 authored by Jonas Arnhold's avatar Jonas Arnhold
Browse files

Make ZContext a singleton

parent 176e8b79
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ public class EdgeService {
public static void main(String[] args) {
registerShutdownHook(); // close ZContext during shutdown
ExecutorService executor = Executors.newFixedThreadPool(3); // assuming we need 3 threads in total
ConcurrentLinkedQueue<SensorData> messageBuffer = new ConcurrentLinkedQueue<>();
......@@ -21,4 +22,9 @@ public class EdgeService {
executor.submit(messageSender);
}
private static void registerShutdownHook() {
Thread printingHook = new Thread(ZContextProvider::close);
Runtime.getRuntime().addShutdownHook(printingHook);
}
}
......@@ -10,8 +10,8 @@ import org.zeromq.ZMQ;
@RequiredArgsConstructor
public class SensorDataCollector implements Runnable {
private static final String USAGE_TOPIC_PORT = "5555";
private static final String TEMPERATURE_TOPIC_PORT = "5556";
private static final int USAGE_TOPIC_PORT = 5555;
private static final int TEMPERATURE_TOPIC_PORT = 5556;
private static final String USAGE_TOPIC = "USAGE";
private static final String TEMPERATURE_TOPIC = "TEMPERATURE";
......@@ -19,21 +19,23 @@ public class SensorDataCollector implements Runnable {
@Override
public void run() {
try (ZContext context = new ZContext()) {
ZMQ.Socket usageSub = context.createSocket(SocketType.SUB);
usageSub.connect("tcp://*:%s".formatted(USAGE_TOPIC_PORT));
usageSub.subscribe(USAGE_TOPIC.getBytes());
ZMQ.Socket temperatureSub = context.createSocket(SocketType.SUB);
temperatureSub.connect("tcp://*:%s".formatted(TEMPERATURE_TOPIC_PORT));
temperatureSub.subscribe(TEMPERATURE_TOPIC.getBytes());
try (ZMQ.Socket usageSub = createSubscriber(USAGE_TOPIC_PORT, USAGE_TOPIC);
ZMQ.Socket temperatureSub = createSubscriber(TEMPERATURE_TOPIC_PORT, TEMPERATURE_TOPIC);)
{
while (!Thread.currentThread().isInterrupted()) {
// we do sync read of both sensors
String usageData = usageSub.recvStr().substring(USAGE_TOPIC.length() + 1);
String temperatureData = temperatureSub.recvStr().substring(TEMPERATURE_TOPIC.length() + 1);
messagebuffer.offer(new SensorData(usageData, temperatureData));
}
}
}
private static ZMQ.Socket createSubscriber(int port, String topic) {
ZContext context = ZContextProvider.getInstance();
ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
subscriber.connect("tcp://*:%d".formatted(port));
subscriber.subscribe(topic.getBytes());
return subscriber;
}
}
package com.fogcomputing;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.zeromq.ZContext;
@Getter
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ZContextProvider {
private static ZContext instance;
public static ZContext getInstance() {
if (instance == null) {
instance = new ZContext();
}
return instance;
}
public static void close() {
if (instance != null) {
instance.close();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment