Применение действия к JavaPairRDD

Как правильно применить функцию JavaPairRDD.foreachPartition?
Я новичок в apache spark и пытаюсь запустить пользовательский алгоритм ближайшего соседа на RDD, который был разделен на 2 части с помощью пользовательского разделителя. JavaPairRDD содержит детали графа и случайный объект, созданный на графе.

Согласно моей логике, я строю подграфы для каждого раздела и запускаю пользовательский алгоритм на каждом подграфе. Кажется, что он работает «хотя и не правильно». Я не уверен, что это правильный способ применения действий в каждом разделе. Я добавляю свой код и результаты. Комментарии и предложения будут высоко оценены.



// <Partition_Index_Key, Map<Source_vertex, Map<Destination Vertex, Tuple2<Edge_Length, ArrayList of Random Objects>>
            JavaPairRDD<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>> adjVertForSubgraphsRDD = jscontext
                    .parallelizePairs(adjacentVerticesForSubgraphs)
                    .partitionBy(new CustomPartitioner(CustomPartitionSize));

            //applying foreachPartition action on JavaPairRDD
            adjVertForSubgraphsRDD.foreachPartition(
                    new VoidFunction<Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>>>() {

                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public void call(
                                Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>> tupleRow)
                                throws Exception {
                            int sourceVertex;
                            int destVertex;
                            double edgeLength;

                            int roadObjectId;
                            boolean roadObjectType;
                            double distanceFromStart;

                            CoreGraph subgraph0 = new CoreGraph();
                            CoreGraph subgraph1 = new CoreGraph();

                            while (tupleRow.hasNext()) {


                                Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>> newMap = tupleRow.next()
                                        ._2();

                                if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 0)) {

                                    for (Object srcVertex : newMap.keySet()) {

                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph0.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn0 = new RoadObject();
                                                    rn0.setObjId(roadObjectId);
                                                    rn0.setType(roadObjectType);
                                                    rn0.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph0.addObjectOnEdge(currentEdgeId, rn0);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }

                                } else if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 1)) {

                                    for (Object srcVertex : newMap.keySet()) {
                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph1.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn1 = new RoadObject();
                                                    rn1.setObjId(roadObjectId);
                                                    rn1.setType(roadObjectType);
                                                    rn1.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph1.addObjectOnEdge(currentEdgeId, rn1);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }
                                }

                            }
                            // Straight forward nearest neighbor algorithm from each true to false.
                            ANNNaive ann = new ANNNaive();
                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg0 = ann.compute(subgraph0, true);
                            System.out.println("for subgraph0");
                            System.out.println(nearestNeighorPairsSubg0);
                            System.err.println("-------------------------------");

                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg1 = ann.compute(subgraph1, true);
                            System.out.println("for subgraph1");
                            System.out.println(nearestNeighorPairsSubg1);
                            System.err.println("-------------------------------");

                        }
                    });`



Войти в полноэкранный режим Выход из полноэкранного режима

Оцените статью
devanswers.ru
Добавить комментарий