In the previous part, we looked at the simulator responsible for sending location update events in the sample application. This time, we will focus on the Function App sitting at the center of the architecture:
Updating latest locations and checking geofences
We have three things that we want to do for each location update event:
- Update the latest location for the device and notify listeners
- Check if the new location falls in a geofence where the device is not currently and notify listeners (and same check for geofence exit)
- Write the event to Azure Data Explorer (ADX)
In the sample app, I chose to create three consumer groups in the Event Hub for this purpose. This allows two different Functions to listen to the same Event Hub and receive the same events. In addition, ADX can listen for those events and write them to its table.
Something that I considered important for both functions (not so much for ADX) is that we don't handle events for the same device simultaneously as that would cause nasty race conditions. With IoT Hub + Event Hub this is surprisingly easy as the partition key used is the device ID in IoT Hub. So events for one device will always go to the same partition in Event Hub. And since we can't have two listeners on the same partition within one consumer group, we can be sure that only one Function App instance is receiving a device's events at any time.
There is one trap that you might hit here though. You should accept an array of events in your Function App, not a single event. The Event Hubs extension will by default call your function in parallel with the events it receives if your Function accepts a single event. You could change the settings in host.json to fetch batches of 1 event, but that would be very slow and inefficient. So instead we receive the whole batch in the function, where we can then choose to parallelise the processing.
Something to keep in mind when processing Event Hub events, is that by default they will not be retried if processing fails. Since Event Hub is not a queue but an event log, it works a bit differently. Events are not taken out of Event Hub after reading them, the checkpoint for the consumer group is just moved forward. If you want retries for events, check out Azure Functions error handling and retries.
Latest location update
Updating the latest location in SQL DB in the sample app is pretty simple. I used the Dapper library for this:
var trackerId = ev.Id;
var location = SqlGeography.Point(latitude: ev.Lat, longitude: ev.Lng, srid: 4326);
await SqlConnection.ExecuteAsync(
@"UPDATE [dbo].[LocationTrackers]
SET [LatestLocation] = @location
WHERE [Id] = @id",
new
{
id = trackerId,
location,
},
transaction);
This is one of those points where you can very easily put the latitude and longitude the wrong way in the variable.
GeoJSON for example uses longitude, latitude order, while this type uses latitude, longitude.
But since we have them in named fields in the event and the factory method uses parameters named latitude
and longitude
,
we can ensure they are correct quite easily.
A point where it isn't as easy is if you work with GeoJSON:
var feature = new Feature(polygon, id: geofence.Id.ToString(), properties: new Dictionary<string, object>
{
// GeoJSON uses longitude, latitude order.
["center"] = new[] { center.Long.Value, center.Lat.Value },
["name"] = geofence.Name ?? ""
});
Since coordinates are an array with two numbers here, we need to just put them in the correct order. I would recommend making a utility method for the conversion so that you won't have to search for that one place where they are the wrong way.
Now what is that number: 4326?
That is the srid
parameter (Spatial Reference Identifier).
It identifies the coordinate reference system that you are using.
The number references the EPSG identifier,
4326 being World Geodetic System 1984, used by GPS.
This system is extremely common for coordinates,
to the point that it is the only one Azure Cosmos DB supports.
There are thousands of coordinate systems in the EPSG registry,
which is something to be aware of if you develop an application that handles coordinate data.
You can often convert between different coordinate systems, and it is in my opinion a good idea to store them in one system. Though just like with dates and times, you might need to store which system was used originally (or even store it as is) in cases where it matters.
In this sample app, we will keep it simple and use EPSG:4326 for the events as well as data storage. They will need conversion to EPSG:3857 for display in the map view, but that's something the UI control handles internally.
Geofence check
When deploying the sample application, the script inserts default geofences into the SQL database. These are polygons defined by a set of coordinates. A geofence is used to check when an entity enters or exits a particular area. These could be logistics hubs, fuel stations, or anything else.
Checking if a point is inside a polygon is quite simple as we can have SQL DB do the heavy lifting. We can also take advantage of spatial indexes to make the process more efficient.
The sample app's geofence check Function does this with an SQL query:
var reader = await SqlConnection.QueryMultipleAsync(
@"SELECT [GeofenceId]
FROM [dbo].[LocationTrackersInGeofences]
WHERE [LocationTrackerId] = @trackerId AND [EntryTimestamp] < @eventTimestamp AND [ExitTimestamp] IS NULL;
SELECT [Id]
FROM [dbo].[Geofences]
WHERE [Border].STContains(@location) = 1",
new
{
trackerId,
eventTimestamp,
location,
},
transaction);
var currentGeofenceIdRows = await reader.ReadAsync();
var currentGeofenceIds = currentGeofenceIdRows.Select(r => (int)r.GeofenceId).ToList();
var collidingGeofenceIdRows = await reader.ReadAsync();
var collidingGeofenceIds = collidingGeofenceIdRows.Select(r => (int)r.Id).ToList();
var enteredGeofenceIds = collidingGeofenceIds.Except(currentGeofenceIds).ToArray();
var exitedGeofenceIds = currentGeofenceIds.Except(collidingGeofenceIds).ToArray();
Here the sample app does two queries at once:
- Get geofences the device was in before this event
- Get geofences that contain the device's latest location
- This uses the STContains method in SQL Server
By using these two pieces of information, we can deduce the geofences that the device has now entered, as well as the geofences it has now exited.
We then add rows for each entered geofence as well as update rows for each exited geofence:
if (enteredGeofenceIds.Length > 0)
{
await SqlConnection.ExecuteAsync(
@"INSERT INTO [dbo].[LocationTrackersInGeofences] ([LocationTrackerId], [GeofenceId], [EntryTimestamp])
VALUES (@trackerId, @geofenceId, @eventTimestamp)",
enteredGeofenceIds
.Select(id => new
{
trackerId,
geofenceId = id,
eventTimestamp,
})
.ToArray(),
transaction);
}
if (exitedGeofenceIds.Length > 0)
{
await SqlConnection.ExecuteAsync(
@"UPDATE [dbo].[LocationTrackersInGeofences]
SET [ExitTimestamp] = @eventTimestamp
WHERE [LocationTrackerId] = @trackerId AND [GeofenceId] IN @exitedGeofenceIds AND [ExitTimestamp] IS NULL",
new
{
eventTimestamp,
trackerId,
exitedGeofenceIds,
},
transaction);
}
You could also do similar checks in Cosmos DB with the ST_WITHIN function.
Updating device parameters through Device Twin
In the front-end, a logged in user can modify two device parameters:
- Speed (km/h)
- Location update interval (milliseconds)
These parameters are communicated to the device through IoT Hub's Device Twins:
using var registryManager = RegistryManager.Create(_iotHubHostName, _tokenCredential);
// Get current twin, we need it for its ETag
var twin = await registryManager.GetTwinAsync(trackerId.ToString());
// Create update
var twinPatch = new Twin();
twinPatch.Properties.Desired["speedKilometersPerHour"] = speedKilometersPerHour;
twinPatch.Properties.Desired["eventIntervalMillis"] = eventIntervalMillis;
// Update twin
await registryManager.UpdateTwinAsync(trackerId.ToString(), twinPatch, twin.ETag);
Note that we have to query the current version of the twin to get its ETag. We don't actually care about the current values, but if you did, this would allow you to implement an optimistic concurrency check and prevent a lost update. These are then received and handled in the simulator's Worker:
// Get initial parameters
var twin = await deviceClient.GetTwinAsync(stoppingToken);
int eventIntervalMillis = twin.Properties.Desired.GetIntegerOrDefault("eventIntervalMillis", DefaultEventIntervalMillis);
int speedKilometersPerHour = twin.Properties.Desired.GetIntegerOrDefault("speedKilometersPerHour", DefaultSpeedKilometersPerHour);
device.SetSpeed(speedKilometersPerHour);
// Report initial parameters as received
await deviceClient.UpdateReportedPropertiesAsync(new TwinCollection(JsonSerializer.Serialize(new
{
eventIntervalMillis,
speedKilometersPerHour,
})), stoppingToken);
// Start listening for updates to desired properties
await deviceClient.SetDesiredPropertyUpdateCallbackAsync(async (desiredProperties, _) =>
{
eventIntervalMillis = desiredProperties.GetIntegerOrDefault("eventIntervalMillis", DefaultEventIntervalMillis);
int updatedSpeedKilometersPerHour = desiredProperties.GetIntegerOrDefault("speedKilometersPerHour", DefaultSpeedKilometersPerHour);
device.SetSpeed(updatedSpeedKilometersPerHour);
// Report update as received
await deviceClient.UpdateReportedPropertiesAsync(new TwinCollection(JsonSerializer.Serialize(new
{
eventIntervalMillis,
speedKilometersPerHour = updatedSpeedKilometersPerHour,
})), stoppingToken);
}, null, stoppingToken);
The device twin is created with default values for the speed and event send interval. First, the worker gets those defaults and reports them as received. Then it can start listening for updates to desired properties. The listener updates the values and reports them as received.
Getting historical location data
If a vehicle is clicked in the front-end, we want to show some number of its previous locations on the map to show where it has been. Since we store historical data in Azure Data Explorer (ADX), we will need a Kusto Query Language (KQL) query:
public async Task<List<PastLocationDto>> GetTrackerRecentPastLocationsAsync(
Guid trackerId)
{
var connectionStringBuilder = new KustoConnectionStringBuilder(_adxClusterUri, _adxDbName)
.WithAadAzureTokenCredentialsAuthentication(_tokenCredential);
connectionStringBuilder.FederatedSecurity = true;
using var client = KustoClientFactory.CreateCslQueryProvider(connectionStringBuilder);
var adxQuery = $@"locations
| where DeviceId == '{trackerId}'
| where Timestamp > ago(10m)
| order by Timestamp desc
| project Longitude, Latitude, Timestamp
| take 20";
using var reader = await client.ExecuteQueryAsync(_adxDbName, adxQuery, new Kusto.Data.Common.ClientRequestProperties());
var results = new List<PastLocationDto>();
while (reader.Read())
{
var longitude = reader.GetDouble(0);
var latitude = reader.GetDouble(1);
var timestamp = reader.GetDateTime(2);
results.Add(new PastLocationDto
{
Latitude = latitude,
Longitude = longitude,
Timestamp = timestamp.ToString("yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff'Z'"),
});
}
return results;
}
The query is relatively simple:
locations
: the table where the data are stored| where DeviceId == '{trackerId}'
: events for the given tracker (note we accept a Guid so we don't have to worry about query injection here)| where Timestamp > ago(10m)
: events for last 10 minutes| order by Timestamp desc
: sort newest first| project Longitude, Latitude, Timestamp
: the fields we want, and in what order (matters when reading the response)| take 20
: latest 20 events (at most)
Reading each value uses indexes based on the order that we specified in the project
:
var longitude = reader.GetDouble(0);
var latitude = reader.GetDouble(1);
var timestamp = reader.GetDateTime(2);
The types for the columns are set in the adx_setup.csl script that is run during deployment:
.create table locations (['DeviceId']:guid, ['Longitude']:real, ['Latitude']:real, ['Timestamp']:datetime)
Summary
Now we have an Azure Function that can take in events and process them. A part of this processing is sending SignalR events, but we will discuss that in more detail in part 4. Thanks for reading!