Skip to content
Snippets Groups Projects
Commit 4b27c116 authored by Jonas Arnhold's avatar Jonas Arnhold
Browse files

Make messages a little more clever

parent d4eedcb4
No related branches found
No related tags found
No related merge requests found
...@@ -2,15 +2,17 @@ package com.fogcomputing; ...@@ -2,15 +2,17 @@ package com.fogcomputing;
import java.io.Serializable; import java.io.Serializable;
import java.sql.Timestamp;
public record SensorData( public record SensorData(
int temperature, int temperature,
int usage int usage,
Timestamp dateTime
) )
implements Serializable implements Serializable
{ {
public SensorData(String temperature, String usage) { public SensorData(String temperature, String usage, Timestamp dateTime) {
this(Integer.parseInt(temperature), Integer.parseInt(usage)); this(Integer.parseInt(temperature), Integer.parseInt(usage), dateTime);
} }
@Override @Override
......
...@@ -5,12 +5,18 @@ import java.sql.Timestamp; ...@@ -5,12 +5,18 @@ import java.sql.Timestamp;
public record SensorDataAggregation( public record SensorDataAggregation(
int averageTemperature, int averageTemperature,
int averageUsage, int averageUsage,
Timestamp timestamp Timestamp first,
Timestamp last
) )
implements Message implements Message
{ {
@Override @Override
public String toString() { public String toString() {
return "SensorDataAggregation %s [averageTemperature=%d, averageUsage=%d]".formatted(timestamp, averageTemperature, averageUsage); return "SensorDataAggregation [%s - %s] [averageTemperature=%d, averageUsage=%d]".formatted(
TimestampFormatter.timeOnly(first),
TimestampFormatter.timeOnly(last),
averageTemperature,
averageUsage
);
} }
} }
...@@ -6,16 +6,37 @@ import java.util.List; ...@@ -6,16 +6,37 @@ import java.util.List;
public record SensorDataBatch( public record SensorDataBatch(
List<SensorData> sensorData, List<SensorData> sensorData,
int size, int size,
Timestamp timestamp Timestamp first,
Timestamp last
) )
implements Message { implements Message
public SensorDataBatch(List<SensorData> sensorData, Timestamp timestamp) { {
this(sensorData, sensorData.size(), timestamp); public static SensorDataBatch of(List<SensorData> sensorData) {
Timestamp first;
Timestamp last;
if (sensorData.isEmpty()) {
Timestamp now = new Timestamp(System.currentTimeMillis());
first = now;
last = now;
}
else {
first = sensorData.get(0).dateTime();
last = sensorData.get(sensorData.size() - 1).dateTime();
}
return new SensorDataBatch(sensorData, sensorData.size(), first, last);
} }
@Override @Override
public String toString() { public String toString() {
return "SensorDataBatch: size = %s, data[Temp,Usage] = %s, timestamp = %s".formatted(size, sensorData, timestamp); return "SensorDataBatch: size = %s, time range = [%s-%s] data[Temp,Usage] = %s".formatted(
size,
TimestampFormatter.timeOnly(first),
TimestampFormatter.timeOnly(last),
sensorData
);
} }
public SensorDataAggregation aggregate() { public SensorDataAggregation aggregate() {
...@@ -29,6 +50,6 @@ public record SensorDataBatch( ...@@ -29,6 +50,6 @@ public record SensorDataBatch(
.average() .average()
.orElse(0); .orElse(0);
return new SensorDataAggregation((int) averageTemperature, (int) averageUsage, timestamp); return new SensorDataAggregation((int) averageTemperature, (int) averageUsage, first, last);
} }
} }
package com.fogcomputing;
import java.sql.Timestamp;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
public class TimestampFormatter {
public static String timeOnly(Timestamp timestamp) {
return timestamp.toInstant()
.atOffset(ZoneOffset.UTC )
.format(DateTimeFormatter.ofPattern("HH:mm:ss" ));
}
}
package com.fogcomputing; package com.fogcomputing;
import java.io.IOException; import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
...@@ -9,7 +8,7 @@ import java.util.concurrent.TimeUnit; ...@@ -9,7 +8,7 @@ import java.util.concurrent.TimeUnit;
public class MessageSender implements Runnable { public class MessageSender implements Runnable {
private static final int DEFAULT_BATCH_SIZE = 15; // we send every 15 seconds, sensor data is written/collected every second private static final int BATCH_SIZE = 15; // we send every 15 seconds, sensor data is written/collected every second
private final ConcurrentLinkedQueue<SensorData> messageBuffer; private final ConcurrentLinkedQueue<SensorData> messageBuffer;
private final Client client; private final Client client;
...@@ -23,18 +22,22 @@ public class MessageSender implements Runnable { ...@@ -23,18 +22,22 @@ public class MessageSender implements Runnable {
public void run() { public void run() {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
List<SensorData> batch = new ArrayList<>(DEFAULT_BATCH_SIZE); List<SensorData> batch = new ArrayList<>(BATCH_SIZE);
while (messageBuffer.peek() != null) { while (batch.size() < BATCH_SIZE && messageBuffer.peek() != null) {
SensorData sensorData = messageBuffer.poll(); SensorData sensorData = messageBuffer.poll();
batch.add(sensorData); batch.add(sensorData);
} }
Timestamp currentDateTime = new Timestamp(System.currentTimeMillis()); // send 1-x batches until buffer is empty, than sleep for 15 seconds
SensorDataBatch sensorDataBatch = new SensorDataBatch(batch, currentDateTime); // this results in sending a batch of 15 SensorData every 15 seconds if server is working properly
if (batch.isEmpty()) {
ThreadUtils.sleep(15, TimeUnit.SECONDS);
continue;
}
SensorDataBatch sensorDataBatch = SensorDataBatch.of(batch);
byte[] response = client.trySend(sensorDataBatch); byte[] response = client.trySend(sensorDataBatch);
handleResponse(response); handleResponse(response);
ThreadUtils.sleep(15, TimeUnit.SECONDS);
} }
} }
......
package com.fogcomputing; package com.fogcomputing;
import java.sql.Timestamp;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
...@@ -26,7 +27,8 @@ public class SensorDataCollector implements Runnable { ...@@ -26,7 +27,8 @@ public class SensorDataCollector implements Runnable {
// we do sync read of both sensors // we do sync read of both sensors
String usageData = usageSub.recvStr().substring(USAGE_TOPIC.length() + 1); String usageData = usageSub.recvStr().substring(USAGE_TOPIC.length() + 1);
String temperatureData = temperatureSub.recvStr().substring(TEMPERATURE_TOPIC.length() + 1); String temperatureData = temperatureSub.recvStr().substring(TEMPERATURE_TOPIC.length() + 1);
messageBuffer.offer(new SensorData(usageData, temperatureData)); Timestamp dateTime = new Timestamp(System.currentTimeMillis());
messageBuffer.offer(new SensorData(usageData, temperatureData, dateTime));
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment