Shortest Path seeks to find the minimum path length between any two nodes in a graph. The algorithm is fundamental since it scales linearly with the number of nodes, when efficiently programmed, and is equivalent to many problems in graph theory which may at first seem intractable. Since HPCC/ECL operates so well on huge amounts of data it is natural to investigate the computationally complex problems of graph theory, and the Dijkstra algorithm is in some ways the simplest case. Unfortunately it is difficult to efficiently parallelize the algorithm - if such is possible - and this example code is not particularly efficient but presented as an example.

The algorithm is conceptually simple: construct a network of paths by processing branches from each node in the order in which the node is reached by the path at that point in the process. The conceptual danger is in confusing this with the normal "greedy algorithm" which would select the shortest edge from the network - each selection must be the **shortest path to that point**, not the shortest edge. The greedy algorithm does work for a network spanning tree, but will fail for shortest paths in some graphs.

The input file here consists of a list of edges, (from node, to node, distance), all integers, and constructs the node set from the edges. I assume a complete graph (directed with edges going each way and fully connected).

Without explaining each step in detail, here is the code:

#workunit('name', 'dijkstra');

#DECLARE (SanityCheck);

#DECLARE (numNodes);

#SET (SanityCheck,100);

#SET (numNodes,50);

noderec:= record

integer nIndex; //index of node

integer nTime:=999999; //time at which node is to be (or was) processed

string nPath:=''; //generated path to this node

boolean completed:=false;

end;

filerec:=record

integer fromNode; //origin node

integer toNode; //destination node

integer distance;

end;

edgerec:=record

integer fromNode; //origin node

integer toNode; //destination node

integer distance; //length of edge

integer inTime:=999999; //temporary storing node time from origin to apply in join to destination node

string inPath:=''; //temporary storing path from origin to apply in join to destination node

integer outTime:=999999; //temporary storing node time to destination to apply in join to destination node

end;

edgesFile := DATASET('~bill::dijkstra::edges',

fileRec,

CSV(HEADING(1),

SEPARATOR([',']),

QUOTE('"'),

TERMINATOR(['\n','\r\n','\n\r'])));

edges:=project(edgesFile,edgeRec)(fromNode<%numNodes%,toNode<%numNodes%,fromNode!=toNode);

/*build Nodes dataset from the edges table */

nodeRec Nodes:=project(table(edges,{edges.fromNode},fromNode),

transform(nodeRec,

self.nIndex:=left.fromNode,

self.nTime:=if(left.fromNode=0,0,999999), //set first node to zero time

self.nPath:=''

));

EdgeTimeIsLess(nodeRec ToNode,edgeRec Edge):=if(Edge.inTime+Edge.distance <ToNode.nTime,true,false);

/*from Edges get a list of linked nodes for the processing node */

edgeRec getNodeEdges(edgeRec Edge,nodeRec Node):=transform

self.inTime:=Node.nTime;

self.inPath:=Node.nPath;

self.outTime:=self.inTime+Edge.distance;

self:=Edge;

end;

//function returns min time of non-completed nodes

minTime(dataset(nodeRec) currentNodes) :=min(currentNodes(completed=false),currentNodes.nTime);

noderec setEdgeNode(nodeRec Node,edgeRec Edge):=transform

self.nIndex:=Node.nIndex,

self.nTime:=if(EdgeTimeIsLess(Node,Edge),Edge.inTime+Edge.distance,Node.nTime),

self.nPath:=if(EdgeTimeIsLess(Node,Edge),Edge.inPath +','+ Edge.fromNode,Node.nPath),

self:=Node;

end;

/*functions of each step in DK algorithm */

//get next nodes based on the times nodes are to become active. Completed nodes excluded by minTime function

nodeRec getNextNodes(dataset(nodeRec) currentNodesX) :=function

mTime:=minTime(currentNodesX);

return currentNodesX(nTime=mTime);

end;

//get the edges to be processed

edgerec getActiveEdges(dataset(edgerec) edges, dataset(noderec) ActiveNodesX):=function

AE:=join(edges,ActiveNodesX,left.fromNode=right.nIndex,getNodeEdges(left,right)); // all edges from active Nodes

return dedup(

sort(

join(edges,ActiveNodesX,left.fromNode=right.nIndex,getNodeEdges(left,right)),

toNode,outTime),

toNode); //retain edges with least time to each destination node

end;

//set the completed flag for nodes that are now processed

nodeRec setNodeCompleteFlag(dataset(noderec) currentNodesX,dataset (noderec) ActiveNodesX):=function

return join(currentNodesX,ActiveNodesX,left.nIndex=right.nIndex,TRANSFORM(

nodeRec,self.completed:=right.nIndex=left.nIndex or left.completed,SELF := left),left outer);

end;

//operating on non-completed Eligible nodes at the other end of the edges.

//set the time and path for the node. But only the first for each node

noderec processTheNodes(dataset(noderec) EligibleNodesX,dataset(edgerec) ActiveEdgesX):=function

return join(EligibleNodesX(completed=false),ActiveEdgesX,right.toNode=left.nIndex,setEdgeNode(left,right));

end;

//the nodes which were just edge-processed, and copy the rest from the current set

noderec newCurrentNodes(dataset(noderec) ProcessNodesX, dataset(noderec) EligibleNodesX):=function

return ProcessNodesX + join(EligibleNodesX,ProcessNodesX,left.nIndex=right.nIndex,TRANSFORM(nodeRec,SELF := left),left only);

end;

nodeRec processNodes(dataset(noderec) EligibleNodesX,dataset(edgerec) ActiveEdgesX):=function

//first process nodes

processNodesX:=processTheNodes(EligibleNodesX,ActiveEdgesX);

//return the new set of current nodes: those just processed and the rest from the current set

return newCurrentNodes(ProcessNodesX,EligibleNodesX);

end;

noderec doDkStep(dataset(noderec) currentNodesX):=function

ActiveNodesX:=getNextNodes(currentNodesX);

ActiveEdgesX:=getActiveEdges(edges,ActiveNodesX);

EligibleNodesX:=setNodeCompleteFlag(currentNodesX,ActiveNodesX);

currentNodesY:=processNodes(EligibleNodesX,ActiveEdgesX);

return currentNodesY;

end;

minpath(dataset(noderec) ds):=loop(ds, EXISTS(ROWS(LEFT)(completed=false)) and counter<%SanityCheck%, doDkStep(ROWS(LEFT)) );

minpath(Nodes);

Breaking it down, the doDkStep function represents the usual steps of the Dijkstra algorithm.

- Get the next nodes to operate on "getNextNodes()" which selects nodes based on the minimum stored path length (time) of all nodes that aren't already completed
- Get the set of edges of these nodes "getActiveEdges()" which includes edges to nodes not yet completed
- From these edges, select eligible destination Nodes "setNodeCompleteFlag()" and set their completed flags
- Process these nodes by comparing the path to the node with the minimum previously discovered "processNodes()", and return the new full set of nodes for the next iteration

When coding this, it is useful to first "brute force" the loop iterations using the same function, so as to have access to the graph state at each step. For example;

currentNodes0:=Nodes;

currentNodes1:=doDkStep(currentNodes0);

currentNodes2:=doDkStep(currentNodes1);

currentNodes3:=doDkStep(currentNodes2);

currentNodes4:=doDkStep(currentNodes3);

//etc

Note, this implementation suffers from the need to SORT and DEDUP on the edge set, in order to sequence the Nodes to be processed. By preserving all path information, it also fails to reduce the dataset for subsequent LOOP iterations. Finally, since the paths are not predicted beforehand, the data is not optimally distributed between nodes for parallel operations.