Skip to content
Snippets Groups Projects
Commit 4bc87f34 authored by Jonas Arnhold's avatar Jonas Arnhold
Browse files

Introduce MessageSender and SensorDataCollector

parent af475b84
No related branches found
No related tags found
No related merge requests found
......@@ -18,5 +18,10 @@
<artifactId>jeromq</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
</dependencies>
</project>
package com.fogcomputing;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class EdgeService {
private static final String USAGE_TOPIC = "USAGE";
private static final String TEMPERATURE_TOPIC = "TEMPERATURE";
public static void main(String[] args) {
try (ZContext context = new ZContext()) {
ZMQ.Socket usageSub = context.createSocket(SocketType.SUB);
usageSub.connect("tcp://*:5555");
usageSub.subscribe(USAGE_TOPIC.getBytes());
ZMQ.Socket temperatureSub = context.createSocket(SocketType.SUB);
temperatureSub.connect("tcp://*:5556");
temperatureSub.subscribe(TEMPERATURE_TOPIC.getBytes());
while (true) {
String usageData = usageSub.recvStr();
String temperatureData = temperatureSub.recvStr();
System.out.println(usageData);
System.out.println(temperatureData);
}
}
// assuming we need 3 threads in total
ExecutorService executor = Executors.newFixedThreadPool(3);
// using String and not a custom object for a first draft
ConcurrentLinkedQueue<String> outboundMessages = new ConcurrentLinkedQueue<>();
SensorDataCollector sensorDataCollector = new SensorDataCollector(outboundMessages);
executor.submit(sensorDataCollector);
MessageSender messageSender = new MessageSender(outboundMessages);
executor.submit(messageSender);
}
}
package com.fogcomputing;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class MessageSender implements Runnable {
private final ConcurrentLinkedQueue<String> outboundMessages;
@Override
public void run() {
while (true) {
while (outboundMessages.peek() != null) {
String message = outboundMessages.poll();
System.out.println("Read message from queue: " + message);
// TODO send message to Cloud service
}
}
}
}
package com.fogcomputing;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.RequiredArgsConstructor;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
@RequiredArgsConstructor
public class SensorDataCollector implements Runnable {
private static final String USAGE_TOPIC_PORT = "5555";
private static final String TEMPERATURE_TOPIC_PORT = "5556";
private static final String USAGE_TOPIC = "USAGE";
private static final String TEMPERATURE_TOPIC = "TEMPERATURE";
private final ConcurrentLinkedQueue<String> messageSink;
@Override
public void run() {
try (ZContext context = new ZContext()) {
ZMQ.Socket usageSub = context.createSocket(SocketType.SUB);
usageSub.connect("tcp://*:%s".formatted(USAGE_TOPIC_PORT));
usageSub.subscribe(USAGE_TOPIC.getBytes());
ZMQ.Socket temperatureSub = context.createSocket(SocketType.SUB);
temperatureSub.connect("tcp://*:%s".formatted(TEMPERATURE_TOPIC_PORT));
temperatureSub.subscribe(TEMPERATURE_TOPIC.getBytes());
while (true) {
String usageData = usageSub.recvStr();
String temperatureData = temperatureSub.recvStr();
messageSink.offer(usageData);
messageSink.offer(temperatureData);
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment