How to get Avro serialised message from a F# record?

I have the following in F#:

type HitType = {
  hitHostname                                   : string;
  hitMemoryUsed                                 : float;
  hitPage                                       : string;
}

I try to serialize it in Avro:

  let specificDatumWriter : SpecificDatumWriter<HitType> =
    SpecificDatumWriter(avroSchema)

  let getAvroMsg msg =
    let memStream =
      new MemoryStream(256)
    let encoder =
      BinaryEncoder(memStream)
    specificDatumWriter.Write(msg, encoder)
    encoder.Flush()
    memStream.ToArray()

When I try to get the Avro byte the following way:

let hit : HitType = { hitHostname = "abc"; hitMemoryUsed = 0.123; hitPage = "None" }
getAvroMsg hit

I get the following exception:

Unable to cast object of type 'TrckFsharp.HitType' to type 'Avro.Specific.ISpecificRecord'.: InvalidCastException
   at Avro.Specific.SpecificDatumWriter`1.WriteRecordFields(Object recordObj, RecordFieldWriter[] writers, Encoder encoder)
   at TrckFsharp.Avro.getAvroMsg(HitType msg) in project-dev/Avro.fs:line 40
   at TrckFsharp.Handler.trckGet(APIGatewayProxyRequest request) in project/Handler.fs:line 77
   at TrckFsharp.Handler.handler(APIGatewayProxyRequest request) in project/Handler.fs:line 137
   at lambda_method(Closure , Stream , Stream , LambdaContextInternal )

Not sure why. Isn't this the way to produce a message with Avro?

Update1:

Avro schema:

{
  "namespace": "com.lambdainsight.hit",
  "type": "record",
  "name": "Hit",
  "fields": [
     {"name": "hitHostname",                                     "type": "string" },
     {"name": "hitMemoryUsed",                                   "type": "float"  },
     {"name": "hitPage",                                         "type": "string" }
  ]
 }

I have uploaded everything into a single repo:

https://github.com/LambdaInsight/avro-test/

1 answer

  • answered 2019-12-09 15:54 Stuart

    The Avro SDK wants your HitType needs to implement ISpecificRecord, which is unfortunate cause it would be nice to use a record type.

    For now you can do it with this HitType:

    type HitType() =
        let schema = Schema.Parse("""{
      "namespace": "com.lambdainsight.hit",
      "type": "record",
      "name": "Hit",
      "fields": [
         {"name": "hitHostname",   "type": "string" },
         {"name": "hitMemoryUsed", "type": "float"  },
         {"name": "hitPage",       "type": "string" }
      ]
     }""")
    
        member val HitHostname = "" with get, set
        member val HitMemoryUsed = 0.f with get, set
        member val HitPage = "" with get, set
    
        interface ISpecificRecord with
            member this.Schema with get() = schema
            member this.Get(fieldPos : int) = 
                match fieldPos with
                | 0 -> this.HitHostname :> obj
                | 1 -> this.HitMemoryUsed :> obj
                | 2 -> this.HitPage :> obj
                | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Get()" fieldPos))
            member this.Put(fieldPos : int, fieldValue : obj) = 
                match fieldPos with
                | 0 -> this.HitHostname <- string fieldValue
                | 1 -> this.HitMemoryUsed <- fieldValue :?> single
                | 2 -> this.HitPage <- string fieldValue
                | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Put()" fieldPos))