Skip to content
Snippets Groups Projects
Commit f21df38d authored by Jonas Arnhold's avatar Jonas Arnhold
Browse files

Fix message sending to cloud service

parent dde38ddb
No related branches found
No related tags found
No related merge requests found
package com.fogcomputing; package com.fogcomputing;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import picocli.CommandLine; import picocli.CommandLine;
public class CloudService implements Callable<Void> { public class CloudService implements Callable<Void> {
...@@ -11,12 +14,28 @@ public class CloudService implements Callable<Void> { ...@@ -11,12 +14,28 @@ public class CloudService implements Callable<Void> {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
try (ZMQ.Socket responder = ZContextProvider.getInstance().createSocket(SocketType.REP)) {
boolean connected = false;
while (!connected) {
connected = responder.connect("tcp://localhost:%s".formatted(cloudServicePort));
System.out.printf("Was not able to init cloud service on port %s...%n", cloudServicePort);
ThreadUtils.sleep(5, TimeUnit.SECONDS);
}
System.out.printf("Cloud service listening on port %s...%n", cloudServicePort);
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from client
String string = responder.recvStr(0);
System.out.printf("Received request: [%s]\n", string);
responder.send("OK");
}
}
return null; return null;
} }
public static void main(String[] args) { public static void main (String[]args){
int exitCode = new CommandLine(new CloudService()).execute(args); int exitCode = new CommandLine(new CloudService()).execute(args);
System.exit(exitCode); System.exit(exitCode);
} }
} }
File added
package com.fogcomputing; package com.fogcomputing;
import java.io.Serializable;
public record SensorData( public record SensorData(
int temperature, int temperature,
int usage int usage
) { )
implements Serializable
{
public SensorData(String temperature, String usage) { public SensorData(String temperature, String usage) {
this(Integer.parseInt(temperature), Integer.parseInt(usage)); this(Integer.parseInt(temperature), Integer.parseInt(usage));
} }
@Override @Override
......
package com.fogcomputing; package com.fogcomputing;
import java.io.Serializable;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.List; import java.util.List;
...@@ -7,7 +8,9 @@ public record SensorDataBatch( ...@@ -7,7 +8,9 @@ public record SensorDataBatch(
List<SensorData> sensorData, List<SensorData> sensorData,
int size, int size,
Timestamp timestamp Timestamp timestamp
) { )
implements Serializable
{
public SensorDataBatch(List<SensorData> sensorData, Timestamp timestamp) { public SensorDataBatch(List<SensorData> sensorData, Timestamp timestamp) {
this(sensorData, sensorData.size(), timestamp); this(sensorData, sensorData.size(), timestamp);
} }
......
No preview for this file type
No preview for this file type
...@@ -24,13 +24,17 @@ public class EdgeService implements Callable<Void> { ...@@ -24,13 +24,17 @@ public class EdgeService implements Callable<Void> {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
registerShutdownHook(); // close ZContext during shutdown registerShutdownHook(ZContextProvider::close); // close ZContext during shutdown
ExecutorService executor = Executors.newFixedThreadPool(3); // assuming we need 3 threads in total ExecutorService executor = Executors.newFixedThreadPool(3); // assuming we need 3 threads in total
ConcurrentLinkedQueue<SensorData> messageBuffer = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<SensorData> messageBuffer = new ConcurrentLinkedQueue<>();
startSensordataCollector(messageBuffer, executor); startSensordataCollector(messageBuffer, executor);
startMessageSender(messageBuffer, executor);
// startMessageReceiver(executor); ZMQ.Socket messageSender = startMessageSender(messageBuffer, executor);
registerShutdownHook(messageSender::close);
// ZMQ.Socket messageReceiver = startMessageReceiver(executor);
// registerShutdownHook(messageReceiver::close);
latch.await(); // keep the main thread alive latch.await(); // keep the main thread alive
return null; return null;
...@@ -41,32 +45,30 @@ public class EdgeService implements Callable<Void> { ...@@ -41,32 +45,30 @@ public class EdgeService implements Callable<Void> {
executor.submit(sensorDataCollector); executor.submit(sensorDataCollector);
} }
private void startMessageSender(ConcurrentLinkedQueue<SensorData> messageBuffer, ExecutorService executor) { private ZMQ.Socket startMessageSender(ConcurrentLinkedQueue<SensorData> messageBuffer, ExecutorService executor) {
try (ZMQ.Socket cloudSocket = ZContextProvider.getInstance().createSocket(SocketType.REQ)) ZMQ.Socket cloudSocket = ZContextProvider.getInstance().createSocket(SocketType.REQ);
{ boolean connected = false;
boolean connected = false; while (!connected) {
while (!connected) { connected = cloudSocket.connect("tcp://%s".formatted(cloudServerAddress));
connected = cloudSocket.connect("tcp://%s".formatted(cloudServerAddress)); System.out.printf("Was not able to connect to cloud server on %s...%n", cloudServerAddress);
System.out.printf("Was not able to connect to cloud server on %s...%n", cloudServerAddress); ThreadUtils.sleep(5, TimeUnit.SECONDS);
ThreadUtils.sleep(5, TimeUnit.SECONDS);
}
MessageSender messageSender = new MessageSender(messageBuffer, cloudSocket);
executor.submit(messageSender);
} }
MessageSender messageSender = new MessageSender(messageBuffer, cloudSocket);
executor.submit(messageSender);
return cloudSocket;
} }
private void startMessageReceiver(ExecutorService executor) { private ZMQ.Socket startMessageReceiver(ExecutorService executor) {
try (ZMQ.Socket cloudSocket = ZContextProvider.getInstance().createSocket(SocketType.REP)) ZMQ.Socket cloudSocket = ZContextProvider.getInstance().createSocket(SocketType.REP);
{ boolean connected = false;
boolean connected = false; while (!connected) {
while (!connected) { connected = cloudSocket.connect("tcp://localhost:%s".formatted(edgeServicePort));
connected = cloudSocket.connect("tcp://localhost:%s".formatted(edgeServicePort)); System.out.println("Was not able to start message receiver on localhost port %s...");
System.out.println("Was not able to start message receiver on localhost port %s..."); ThreadUtils.sleep(5, TimeUnit.SECONDS);
ThreadUtils.sleep(5, TimeUnit.SECONDS);
}
MessageReceiver messageReceiver = new MessageReceiver(cloudSocket);
executor.submit(messageReceiver);
} }
MessageReceiver messageReceiver = new MessageReceiver(cloudSocket);
executor.submit(messageReceiver);
return cloudSocket;
} }
public static void main(String[] args) { public static void main(String[] args) {
...@@ -74,8 +76,8 @@ public class EdgeService implements Callable<Void> { ...@@ -74,8 +76,8 @@ public class EdgeService implements Callable<Void> {
System.exit(exitCode); System.exit(exitCode);
} }
private static void registerShutdownHook() { private static void registerShutdownHook(Runnable runnable) {
Thread printingHook = new Thread(ZContextProvider::close); Thread printingHook = new Thread(runnable);
Runtime.getRuntime().addShutdownHook(printingHook); Runtime.getRuntime().addShutdownHook(printingHook);
} }
} }
...@@ -48,7 +48,7 @@ public class MessageSender implements Runnable { ...@@ -48,7 +48,7 @@ public class MessageSender implements Runnable {
Timestamp currentDateTime = new Timestamp(System.currentTimeMillis()); Timestamp currentDateTime = new Timestamp(System.currentTimeMillis());
SensorDataBatch sensorDataBatch = new SensorDataBatch(batch, currentDateTime); SensorDataBatch sensorDataBatch = new SensorDataBatch(batch, currentDateTime);
System.out.printf("Batch ready to sent: %s\n", sensorDataBatch); System.out.printf("Batch ready to sent: %s\n", sensorDataBatch);
// trySendToCloud(sensorDataBatch); // not tested yet trySendToCloud(sensorDataBatch);
ThreadUtils.sleep(15, TimeUnit.SECONDS); ThreadUtils.sleep(15, TimeUnit.SECONDS);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment